Python audioop 模块,rms() 实例源码

我们从Python开源项目中,提取了以下37个代码示例,用于说明如何使用audioop.rms()

项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_issue7673(self):
        state = None
        for data, size in INVALID_DATA:
            size2 = size
            self.assertRaises(audioop.error, audioop.getsample, data, size, 0)
            self.assertRaises(audioop.error, audioop.max, data, size)
            self.assertRaises(audioop.error, audioop.minmax, data, size)
            self.assertRaises(audioop.error, audioop.avg, data, size)
            self.assertRaises(audioop.error, audioop.rms, data, size)
            self.assertRaises(audioop.error, audioop.avgpp, data, size)
            self.assertRaises(audioop.error, audioop.maxpp, data, size)
            self.assertRaises(audioop.error, audioop.cross, data, size)
            self.assertRaises(audioop.error, audioop.mul, data, size, 1.0)
            self.assertRaises(audioop.error, audioop.tomono, data, size, 0.5, 0.5)
            self.assertRaises(audioop.error, audioop.tostereo, data, size, 0.5, 0.5)
            self.assertRaises(audioop.error, audioop.add, data, data, size)
            self.assertRaises(audioop.error, audioop.bias, data, size, 0)
            self.assertRaises(audioop.error, audioop.reverse, data, size)
            self.assertRaises(audioop.error, audioop.lin2lin, data, size, size2)
            self.assertRaises(audioop.error, audioop.ratecv, data, size, 1, 1, 1, state)
            self.assertRaises(audioop.error, audioop.lin2ulaw, data, size)
            self.assertRaises(audioop.error, audioop.lin2alaw, data, size)
            self.assertRaises(audioop.error, audioop.lin2adpcm, data, size, state)
项目:avs    作者:respeaker    | 项目源码 | 文件源码
def main():
    audio = Audio(frames_size=1600)
    rms = RMS()

    audio.link(rms)

    audio.start()

    alarm = os.path.realpath(os.path.join(os.path.dirname(__file__), 'resources/alarm.mp3'))
    alarm_uri = 'file://{}'.format(alarm)

    player1 = Player()
    player2 = Player()

    while True:
        try:
            player1.play(alarm_uri)
            time.sleep(1)
            player2.play(alarm_uri)
            time.sleep(3)
        except KeyboardInterrupt:
            break

    audio.stop()
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_issue7673(self):
        state = None
        for data, size in INVALID_DATA:
            size2 = size
            self.assertRaises(audioop.error, audioop.getsample, data, size, 0)
            self.assertRaises(audioop.error, audioop.max, data, size)
            self.assertRaises(audioop.error, audioop.minmax, data, size)
            self.assertRaises(audioop.error, audioop.avg, data, size)
            self.assertRaises(audioop.error, audioop.rms, data, size)
            self.assertRaises(audioop.error, audioop.avgpp, data, size)
            self.assertRaises(audioop.error, audioop.maxpp, data, size)
            self.assertRaises(audioop.error, audioop.cross, data, size)
            self.assertRaises(audioop.error, audioop.mul, data, size, 1.0)
            self.assertRaises(audioop.error, audioop.tomono, data, size, 0.5, 0.5)
            self.assertRaises(audioop.error, audioop.tostereo, data, size, 0.5, 0.5)
            self.assertRaises(audioop.error, audioop.add, data, data, size)
            self.assertRaises(audioop.error, audioop.bias, data, size, 0)
            self.assertRaises(audioop.error, audioop.reverse, data, size)
            self.assertRaises(audioop.error, audioop.lin2lin, data, size, size2)
            self.assertRaises(audioop.error, audioop.ratecv, data, size, 1, 1, 1, state)
            self.assertRaises(audioop.error, audioop.lin2ulaw, data, size)
            self.assertRaises(audioop.error, audioop.lin2alaw, data, size)
            self.assertRaises(audioop.error, audioop.lin2adpcm, data, size, state)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_issue7673(self):
        state = None
        for data, size in INVALID_DATA:
            size2 = size
            self.assertRaises(audioop.error, audioop.getsample, data, size, 0)
            self.assertRaises(audioop.error, audioop.max, data, size)
            self.assertRaises(audioop.error, audioop.minmax, data, size)
            self.assertRaises(audioop.error, audioop.avg, data, size)
            self.assertRaises(audioop.error, audioop.rms, data, size)
            self.assertRaises(audioop.error, audioop.avgpp, data, size)
            self.assertRaises(audioop.error, audioop.maxpp, data, size)
            self.assertRaises(audioop.error, audioop.cross, data, size)
            self.assertRaises(audioop.error, audioop.mul, data, size, 1.0)
            self.assertRaises(audioop.error, audioop.tomono, data, size, 0.5, 0.5)
            self.assertRaises(audioop.error, audioop.tostereo, data, size, 0.5, 0.5)
            self.assertRaises(audioop.error, audioop.add, data, data, size)
            self.assertRaises(audioop.error, audioop.bias, data, size, 0)
            self.assertRaises(audioop.error, audioop.reverse, data, size)
            self.assertRaises(audioop.error, audioop.lin2lin, data, size, size2)
            self.assertRaises(audioop.error, audioop.ratecv, data, size, 1, 1, 1, state)
            self.assertRaises(audioop.error, audioop.lin2ulaw, data, size)
            self.assertRaises(audioop.error, audioop.lin2alaw, data, size)
            self.assertRaises(audioop.error, audioop.lin2adpcm, data, size, state)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_issue7673(self):
        state = None
        for data, size in INVALID_DATA:
            size2 = size
            self.assertRaises(audioop.error, audioop.getsample, data, size, 0)
            self.assertRaises(audioop.error, audioop.max, data, size)
            self.assertRaises(audioop.error, audioop.minmax, data, size)
            self.assertRaises(audioop.error, audioop.avg, data, size)
            self.assertRaises(audioop.error, audioop.rms, data, size)
            self.assertRaises(audioop.error, audioop.avgpp, data, size)
            self.assertRaises(audioop.error, audioop.maxpp, data, size)
            self.assertRaises(audioop.error, audioop.cross, data, size)
            self.assertRaises(audioop.error, audioop.mul, data, size, 1.0)
            self.assertRaises(audioop.error, audioop.tomono, data, size, 0.5, 0.5)
            self.assertRaises(audioop.error, audioop.tostereo, data, size, 0.5, 0.5)
            self.assertRaises(audioop.error, audioop.add, data, data, size)
            self.assertRaises(audioop.error, audioop.bias, data, size, 0)
            self.assertRaises(audioop.error, audioop.reverse, data, size)
            self.assertRaises(audioop.error, audioop.lin2lin, data, size, size2)
            self.assertRaises(audioop.error, audioop.ratecv, data, size, 1, 1, 1, state)
            self.assertRaises(audioop.error, audioop.lin2ulaw, data, size)
            self.assertRaises(audioop.error, audioop.lin2alaw, data, size)
            self.assertRaises(audioop.error, audioop.lin2adpcm, data, size, state)
项目:streamtotext    作者:ibm-dev    | 项目源码 | 文件源码
def detect_squelch_level(self, detect_time=10, threshold=.8):
        start_time = time.time()
        end_time = start_time + detect_time
        audio_chunks = collections.deque()
        async with self._source.listen():
            async for block in self._source:
                if time.time() > end_time:
                    break
                even_iter = EvenChunkIterator(block, self._sample_size)
                try:
                    while time.time() < end_time:
                        audio_chunks.append(await even_iter.__anext__())
                except StopAsyncIteration:
                    pass

        rms_vals = [audioop.rms(x.audio, self._sample_width) for x in
                    audio_chunks
                    if len(x.audio) == self._sample_size * self._sample_width]
        level = sorted(rms_vals)[int(threshold * len(rms_vals)):][0]
        self.squelch_level = level
        return level
项目:discord_chat_bot    作者:chromaticity    | 项目源码 | 文件源码
def read(self, frame_size):
        self.frame_count += 1

        frame = self.buff.read(frame_size)

        if self.volume != 1:
            frame = self._frame_vol(frame, self.volume, maxv=2)

        if self.draw and not self.frame_count % self.frame_skip:
            # these should be processed for every frame, but "overhead"
            rms = audioop.rms(frame, 2)
            self.rmss.append(rms)

            max_rms = sorted(self.rmss)[-1]
            meter_text = 'avg rms: {:.2f}, max rms: {:.2f} '.format(self._avg(self.rmss), max_rms)
            self._pprint_meter(rms / max(1, max_rms), text=meter_text, shift=True)

        return frame
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_issue7673(self):
        state = None
        for data, size in INVALID_DATA:
            size2 = size
            self.assertRaises(audioop.error, audioop.getsample, data, size, 0)
            self.assertRaises(audioop.error, audioop.max, data, size)
            self.assertRaises(audioop.error, audioop.minmax, data, size)
            self.assertRaises(audioop.error, audioop.avg, data, size)
            self.assertRaises(audioop.error, audioop.rms, data, size)
            self.assertRaises(audioop.error, audioop.avgpp, data, size)
            self.assertRaises(audioop.error, audioop.maxpp, data, size)
            self.assertRaises(audioop.error, audioop.cross, data, size)
            self.assertRaises(audioop.error, audioop.mul, data, size, 1.0)
            self.assertRaises(audioop.error, audioop.tomono, data, size, 0.5, 0.5)
            self.assertRaises(audioop.error, audioop.tostereo, data, size, 0.5, 0.5)
            self.assertRaises(audioop.error, audioop.add, data, data, size)
            self.assertRaises(audioop.error, audioop.bias, data, size, 0)
            self.assertRaises(audioop.error, audioop.reverse, data, size)
            self.assertRaises(audioop.error, audioop.lin2lin, data, size, size2)
            self.assertRaises(audioop.error, audioop.ratecv, data, size, 1, 1, 1, state)
            self.assertRaises(audioop.error, audioop.lin2ulaw, data, size)
            self.assertRaises(audioop.error, audioop.lin2alaw, data, size)
            self.assertRaises(audioop.error, audioop.lin2adpcm, data, size, state)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_issue7673(self):
        state = None
        for data, size in INVALID_DATA:
            size2 = size
            self.assertRaises(audioop.error, audioop.getsample, data, size, 0)
            self.assertRaises(audioop.error, audioop.max, data, size)
            self.assertRaises(audioop.error, audioop.minmax, data, size)
            self.assertRaises(audioop.error, audioop.avg, data, size)
            self.assertRaises(audioop.error, audioop.rms, data, size)
            self.assertRaises(audioop.error, audioop.avgpp, data, size)
            self.assertRaises(audioop.error, audioop.maxpp, data, size)
            self.assertRaises(audioop.error, audioop.cross, data, size)
            self.assertRaises(audioop.error, audioop.mul, data, size, 1.0)
            self.assertRaises(audioop.error, audioop.tomono, data, size, 0.5, 0.5)
            self.assertRaises(audioop.error, audioop.tostereo, data, size, 0.5, 0.5)
            self.assertRaises(audioop.error, audioop.add, data, data, size)
            self.assertRaises(audioop.error, audioop.bias, data, size, 0)
            self.assertRaises(audioop.error, audioop.reverse, data, size)
            self.assertRaises(audioop.error, audioop.lin2lin, data, size, size2)
            self.assertRaises(audioop.error, audioop.ratecv, data, size, 1, 1, 1, state)
            self.assertRaises(audioop.error, audioop.lin2ulaw, data, size)
            self.assertRaises(audioop.error, audioop.lin2alaw, data, size)
            self.assertRaises(audioop.error, audioop.lin2adpcm, data, size, state)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_string(self):
        data = 'abcd'
        size = 2
        self.assertRaises(TypeError, audioop.getsample, data, size, 0)
        self.assertRaises(TypeError, audioop.max, data, size)
        self.assertRaises(TypeError, audioop.minmax, data, size)
        self.assertRaises(TypeError, audioop.avg, data, size)
        self.assertRaises(TypeError, audioop.rms, data, size)
        self.assertRaises(TypeError, audioop.avgpp, data, size)
        self.assertRaises(TypeError, audioop.maxpp, data, size)
        self.assertRaises(TypeError, audioop.cross, data, size)
        self.assertRaises(TypeError, audioop.mul, data, size, 1.0)
        self.assertRaises(TypeError, audioop.tomono, data, size, 0.5, 0.5)
        self.assertRaises(TypeError, audioop.tostereo, data, size, 0.5, 0.5)
        self.assertRaises(TypeError, audioop.add, data, data, size)
        self.assertRaises(TypeError, audioop.bias, data, size, 0)
        self.assertRaises(TypeError, audioop.reverse, data, size)
        self.assertRaises(TypeError, audioop.lin2lin, data, size, size)
        self.assertRaises(TypeError, audioop.ratecv, data, size, 1, 1, 1, None)
        self.assertRaises(TypeError, audioop.lin2ulaw, data, size)
        self.assertRaises(TypeError, audioop.lin2alaw, data, size)
        self.assertRaises(TypeError, audioop.lin2adpcm, data, size, None)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_issue7673(self):
        state = None
        for data, size in INVALID_DATA:
            size2 = size
            self.assertRaises(audioop.error, audioop.getsample, data, size, 0)
            self.assertRaises(audioop.error, audioop.max, data, size)
            self.assertRaises(audioop.error, audioop.minmax, data, size)
            self.assertRaises(audioop.error, audioop.avg, data, size)
            self.assertRaises(audioop.error, audioop.rms, data, size)
            self.assertRaises(audioop.error, audioop.avgpp, data, size)
            self.assertRaises(audioop.error, audioop.maxpp, data, size)
            self.assertRaises(audioop.error, audioop.cross, data, size)
            self.assertRaises(audioop.error, audioop.mul, data, size, 1.0)
            self.assertRaises(audioop.error, audioop.tomono, data, size, 0.5, 0.5)
            self.assertRaises(audioop.error, audioop.tostereo, data, size, 0.5, 0.5)
            self.assertRaises(audioop.error, audioop.add, data, data, size)
            self.assertRaises(audioop.error, audioop.bias, data, size, 0)
            self.assertRaises(audioop.error, audioop.reverse, data, size)
            self.assertRaises(audioop.error, audioop.lin2lin, data, size, size2)
            self.assertRaises(audioop.error, audioop.ratecv, data, size, 1, 1, 1, state)
            self.assertRaises(audioop.error, audioop.lin2ulaw, data, size)
            self.assertRaises(audioop.error, audioop.lin2alaw, data, size)
            self.assertRaises(audioop.error, audioop.lin2adpcm, data, size, state)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_issue7673(self):
        state = None
        for data, size in INVALID_DATA:
            size2 = size
            self.assertRaises(audioop.error, audioop.getsample, data, size, 0)
            self.assertRaises(audioop.error, audioop.max, data, size)
            self.assertRaises(audioop.error, audioop.minmax, data, size)
            self.assertRaises(audioop.error, audioop.avg, data, size)
            self.assertRaises(audioop.error, audioop.rms, data, size)
            self.assertRaises(audioop.error, audioop.avgpp, data, size)
            self.assertRaises(audioop.error, audioop.maxpp, data, size)
            self.assertRaises(audioop.error, audioop.cross, data, size)
            self.assertRaises(audioop.error, audioop.mul, data, size, 1.0)
            self.assertRaises(audioop.error, audioop.tomono, data, size, 0.5, 0.5)
            self.assertRaises(audioop.error, audioop.tostereo, data, size, 0.5, 0.5)
            self.assertRaises(audioop.error, audioop.add, data, data, size)
            self.assertRaises(audioop.error, audioop.bias, data, size, 0)
            self.assertRaises(audioop.error, audioop.reverse, data, size)
            self.assertRaises(audioop.error, audioop.lin2lin, data, size, size2)
            self.assertRaises(audioop.error, audioop.ratecv, data, size, 1, 1, 1, state)
            self.assertRaises(audioop.error, audioop.lin2ulaw, data, size)
            self.assertRaises(audioop.error, audioop.lin2alaw, data, size)
            self.assertRaises(audioop.error, audioop.lin2adpcm, data, size, state)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_string(self):
        data = 'abcd'
        size = 2
        self.assertRaises(TypeError, audioop.getsample, data, size, 0)
        self.assertRaises(TypeError, audioop.max, data, size)
        self.assertRaises(TypeError, audioop.minmax, data, size)
        self.assertRaises(TypeError, audioop.avg, data, size)
        self.assertRaises(TypeError, audioop.rms, data, size)
        self.assertRaises(TypeError, audioop.avgpp, data, size)
        self.assertRaises(TypeError, audioop.maxpp, data, size)
        self.assertRaises(TypeError, audioop.cross, data, size)
        self.assertRaises(TypeError, audioop.mul, data, size, 1.0)
        self.assertRaises(TypeError, audioop.tomono, data, size, 0.5, 0.5)
        self.assertRaises(TypeError, audioop.tostereo, data, size, 0.5, 0.5)
        self.assertRaises(TypeError, audioop.add, data, data, size)
        self.assertRaises(TypeError, audioop.bias, data, size, 0)
        self.assertRaises(TypeError, audioop.reverse, data, size)
        self.assertRaises(TypeError, audioop.lin2lin, data, size, size)
        self.assertRaises(TypeError, audioop.ratecv, data, size, 1, 1, 1, None)
        self.assertRaises(TypeError, audioop.lin2ulaw, data, size)
        self.assertRaises(TypeError, audioop.lin2alaw, data, size)
        self.assertRaises(TypeError, audioop.lin2adpcm, data, size, None)
项目:synthesizer    作者:irmen    | 项目源码 | 文件源码
def __db_level(self, rms_mode=False):
        """
        Returns the average audio volume level measured in dB (range -60 db to 0 db)
        If the sample is stereo, you get back a tuple: (left_level, right_level)
        If the sample is mono, you still get a tuple but both values will be the same.
        This method is probably only useful if processed on very short sample fragments in sequence,
        so the db levels could be used to show a level meter for the duration of the sample.
        """
        maxvalue = 2**(8*self.__samplewidth-1)
        if self.nchannels == 1:
            if rms_mode:
                peak_left = peak_right = (audioop.rms(self.__frames, self.__samplewidth)+1)/maxvalue
            else:
                peak_left = peak_right = (audioop.max(self.__frames, self.__samplewidth)+1)/maxvalue
        else:
            left_frames = audioop.tomono(self.__frames, self.__samplewidth, 1, 0)
            right_frames = audioop.tomono(self.__frames, self.__samplewidth, 0, 1)
            if rms_mode:
                peak_left = (audioop.rms(left_frames, self.__samplewidth)+1)/maxvalue
                peak_right = (audioop.rms(right_frames, self.__samplewidth)+1)/maxvalue
            else:
                peak_left = (audioop.max(left_frames, self.__samplewidth)+1)/maxvalue
                peak_right = (audioop.max(right_frames, self.__samplewidth)+1)/maxvalue
        # cut off at the bottom at -60 instead of all the way down to -infinity
        return max(20.0*math.log(peak_left, 10), -60.0), max(20.0*math.log(peak_right, 10), -60.0)
项目:pyrecord    作者:Aakashdeveloper    | 项目源码 | 文件源码
def adjust_for_ambient_noise(self, source, duration = 1):
        """
        Adjusts the energy threshold dynamically using audio from ``source`` (an ``AudioSource`` instance) to account for ambient noise.

        Intended to calibrate the energy threshold with the ambient energy level. Should be used on periods of audio without speech - will stop early if any speech is detected.

        The ``duration`` parameter is the maximum number of seconds that it will dynamically adjust the threshold for before returning. This value should be at least 0.5 in order to get a representative sample of the ambient noise.
        """
        assert isinstance(source, AudioSource), "Source must be an audio source"
        assert self.pause_threshold >= self.non_speaking_duration >= 0

        seconds_per_buffer = (source.CHUNK + 0.0) / source.SAMPLE_RATE
        elapsed_time = 0

        # adjust energy threshold until a phrase starts
        while True:
            elapsed_time += seconds_per_buffer
            if elapsed_time > duration: break
            buffer = source.stream.read(source.CHUNK, exception_on_overflow=False)
            energy = audioop.rms(buffer, source.SAMPLE_WIDTH) # energy of the audio signal

            # dynamically adjust the energy threshold using assymmetric weighted average
            damping = self.dynamic_energy_adjustment_damping ** seconds_per_buffer # account for different chunk sizes and rates
            target_energy = energy * self.dynamic_energy_ratio
            self.energy_threshold = self.energy_threshold * damping + target_energy * (1 - damping)
项目:AlexaBot    作者:jacobajit    | 项目源码 | 文件源码
def adjust_for_ambient_noise(self, source, duration = 1):
        """
        Adjusts the energy threshold dynamically using audio from ``source`` (an ``AudioSource`` instance) to account for ambient noise.
        Intended to calibrate the energy threshold with the ambient energy level. Should be used on periods of audio without speech - will stop early if any speech is detected.
        The ``duration`` parameter is the maximum number of seconds that it will dynamically adjust the threshold for before returning. This value should be at least 0.5 in order to get a representative sample of the ambient noise.
        """
        assert isinstance(source, AudioSource), "Source must be an audio source"
        assert source.stream is not None, "Audio source must be entered before adjusting, see documentation for `AudioSource`; are you using `source` outside of a `with` statement?"
        assert self.pause_threshold >= self.non_speaking_duration >= 0

        seconds_per_buffer = (source.CHUNK + 0.0) / source.SAMPLE_RATE
        elapsed_time = 0

        # adjust energy threshold until a phrase starts
        while True:
            elapsed_time += seconds_per_buffer
            if elapsed_time > duration: break
            buffer = source.stream.read(source.CHUNK)
            energy = audioop.rms(buffer, source.SAMPLE_WIDTH) # energy of the audio signal

            # dynamically adjust the energy threshold using assymmetric weighted average
            damping = self.dynamic_energy_adjustment_damping ** seconds_per_buffer # account for different chunk sizes and rates
            target_energy = energy * self.dynamic_energy_ratio
            self.energy_threshold = self.energy_threshold * damping + target_energy * (1 - damping)
项目:mycroft-light    作者:MatthewScholefield    | 项目源码 | 文件源码
def _calc_energy(self, sound_chunk):
        """Calculates how loud the sound is"""
        return audioop.rms(sound_chunk, self.sample_width)
项目:dingdang-robot    作者:wzpan    | 项目源码 | 文件源码
def getScore(self, data):
        rms = audioop.rms(data, 2)
        score = rms / 3
        return score
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_rms(self):
        self.assertEqual(audioop.rms(data[0], 1), 1)
        self.assertEqual(audioop.rms(data[1], 2), 1)
        self.assertEqual(audioop.rms(data[2], 4), 1)
项目:avs    作者:respeaker    | 项目源码 | 文件源码
def put(self, data):
        print('RMS: {}'.format(audioop.rms(data, 2)))
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_rms(self):
        for w in 1, 2, 4:
            self.assertEqual(audioop.rms(b'', w), 0)
            p = packs[w]
            self.assertEqual(audioop.rms(p(*range(100)), w), 57)
            self.assertAlmostEqual(audioop.rms(p(maxvalues[w]) * 5, w),
                                   maxvalues[w], delta=1)
            self.assertAlmostEqual(audioop.rms(p(minvalues[w]) * 5, w),
                                   -minvalues[w], delta=1)
        self.assertEqual(audioop.rms(datas[1], 1), 77)
        self.assertEqual(audioop.rms(datas[2], 2), 20001)
        self.assertEqual(audioop.rms(datas[4], 4), 1310854152)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_rms(self):
        for w in 1, 2, 4:
            self.assertEqual(audioop.rms(b'', w), 0)
            p = packs[w]
            self.assertEqual(audioop.rms(p(*range(100)), w), 57)
            self.assertAlmostEqual(audioop.rms(p(maxvalues[w]) * 5, w),
                                   maxvalues[w], delta=1)
            self.assertAlmostEqual(audioop.rms(p(minvalues[w]) * 5, w),
                                   -minvalues[w], delta=1)
        self.assertEqual(audioop.rms(datas[1], 1), 77)
        self.assertEqual(audioop.rms(datas[2], 2), 20001)
        self.assertEqual(audioop.rms(datas[4], 4), 1310854152)
项目:jessy    作者:jessy-project    | 项目源码 | 文件源码
def get_score(self, data):
        rms = audioop.rms(data, 2)
        score = rms / 3
        return score
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_rms(self):
        for w in 1, 2, 4:
            self.assertEqual(audioop.rms(b'', w), 0)
            p = packs[w]
            self.assertEqual(audioop.rms(p(*range(100)), w), 57)
            self.assertAlmostEqual(audioop.rms(p(maxvalues[w]) * 5, w),
                                   maxvalues[w], delta=1)
            self.assertAlmostEqual(audioop.rms(p(minvalues[w]) * 5, w),
                                   -minvalues[w], delta=1)
        self.assertEqual(audioop.rms(datas[1], 1), 77)
        self.assertEqual(audioop.rms(datas[2], 2), 20001)
        self.assertEqual(audioop.rms(datas[4], 4), 1310854152)
项目:streamtotext    作者:ibm-dev    | 项目源码 | 文件源码
def check_squelch(level, is_triggered, chunks):
        rms_vals = [audioop.rms(x.audio, x.width) for x in chunks]
        median_rms = sorted(rms_vals)[int(len(rms_vals) * .5)]
        if is_triggered:
            if median_rms < (level * .8):
                return False
            else:
                return True
        else:
            if median_rms > level:
                return True
            else:
                return False
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_rms(self):
        for w in 1, 2, 4:
            self.assertEqual(audioop.rms(b'', w), 0)
            p = packs[w]
            self.assertEqual(audioop.rms(p(*range(100)), w), 57)
            self.assertAlmostEqual(audioop.rms(p(maxvalues[w]) * 5, w),
                                   maxvalues[w], delta=1)
            self.assertAlmostEqual(audioop.rms(p(minvalues[w]) * 5, w),
                                   -minvalues[w], delta=1)
        self.assertEqual(audioop.rms(datas[1], 1), 77)
        self.assertEqual(audioop.rms(datas[2], 2), 20001)
        self.assertEqual(audioop.rms(datas[4], 4), 1310854152)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_rms(self):
        for w in 1, 2, 3, 4:
            self.assertEqual(audioop.rms(b'', w), 0)
            self.assertEqual(audioop.rms(bytearray(), w), 0)
            self.assertEqual(audioop.rms(memoryview(b''), w), 0)
            p = packs[w]
            self.assertEqual(audioop.rms(p(*range(100)), w), 57)
            self.assertAlmostEqual(audioop.rms(p(maxvalues[w]) * 5, w),
                                   maxvalues[w], delta=1)
            self.assertAlmostEqual(audioop.rms(p(minvalues[w]) * 5, w),
                                   -minvalues[w], delta=1)
        self.assertEqual(audioop.rms(datas[1], 1), 77)
        self.assertEqual(audioop.rms(datas[2], 2), 20001)
        self.assertEqual(audioop.rms(datas[3], 3), 5120523)
        self.assertEqual(audioop.rms(datas[4], 4), 1310854152)
项目:j2f    作者:jasper2fork    | 项目源码 | 文件源码
def _snr(self, frames):
        rms = audioop.rms(b''.join(frames), int(self._input_bits / 8))
        if rms > 0 and self._threshold > 0:
            return 20.0 * math.log(rms / self._threshold, 10)
        else:
            return 0
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_rms(self):
        for w in 1, 2, 4:
            self.assertEqual(audioop.rms(b'', w), 0)
            p = packs[w]
            self.assertEqual(audioop.rms(p(*range(100)), w), 57)
            self.assertAlmostEqual(audioop.rms(p(maxvalues[w]) * 5, w),
                                   maxvalues[w], delta=1)
            self.assertAlmostEqual(audioop.rms(p(minvalues[w]) * 5, w),
                                   -minvalues[w], delta=1)
        self.assertEqual(audioop.rms(datas[1], 1), 77)
        self.assertEqual(audioop.rms(datas[2], 2), 20001)
        self.assertEqual(audioop.rms(datas[4], 4), 1310854152)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_rms(self):
        for w in 1, 2, 3, 4:
            self.assertEqual(audioop.rms(b'', w), 0)
            self.assertEqual(audioop.rms(bytearray(), w), 0)
            self.assertEqual(audioop.rms(memoryview(b''), w), 0)
            p = packs[w]
            self.assertEqual(audioop.rms(p(*range(100)), w), 57)
            self.assertAlmostEqual(audioop.rms(p(maxvalues[w]) * 5, w),
                                   maxvalues[w], delta=1)
            self.assertAlmostEqual(audioop.rms(p(minvalues[w]) * 5, w),
                                   -minvalues[w], delta=1)
        self.assertEqual(audioop.rms(datas[1], 1), 77)
        self.assertEqual(audioop.rms(datas[2], 2), 20001)
        self.assertEqual(audioop.rms(datas[3], 3), 5120523)
        self.assertEqual(audioop.rms(datas[4], 4), 1310854152)
项目:synthesizer    作者:irmen    | 项目源码 | 文件源码
def rms(self):
        return audioop.rms(self.__frames, self.samplewidth)
项目:Friday    作者:Zenohm    | 项目源码 | 文件源码
def _apiai_stt(self):
        from math import log
        import audioop
        import pyaudio
        import time
        resampler = apiai.Resampler(source_samplerate=settings['RATE'])
        request = self.ai.voice_request()
        vad = apiai.VAD()

        def callback(in_data, frame_count):
            frames, data = resampler.resample(in_data, frame_count)
            if settings.show_decibels:
                decibel = 20 * log(audioop.rms(data, 2) + 1, 10)
                click.echo(decibel)
            state = vad.processFrame(frames)
            request.send(data)
            state_signal = pyaudio.paContinue if state == 1 else pyaudio.paComplete
            return in_data, state_signal

        p = pyaudio.PyAudio()
        stream = p.open(format=pyaudio.paInt32, input=True, output=False, stream_callback=callback,
                        channels=settings['CHANNELS'], rate=settings['RATE'], frames_per_buffer=settings['CHUNK'])
        stream.start_stream()
        click.echo("Speak!")
        while stream.is_active():
            time.sleep(0.1)
        stream.stop_stream()
        stream.close()
        p.terminate()
项目:MUSTani-Robot    作者:swayam01    | 项目源码 | 文件源码
def check_silence(self, buf):
        volume = audioop.rms(buf, 2)
        if (volume >= config.THRESHOLD):
            self.silence_timer = time.time()
            if (self.append == False):
                if (self.hatch.get('debug') == True):
                    print ('starting append mode')
                self.timer = time.time()
                for sbuf in self.silence_buffer:
                    self.prepare.prepare(sbuf, volume)
                self.silence_buffer = [ ]
            self.append = True
            self.silence_counter = 0
        else:
            self.silence_counter += 1
            self.silence_buffer.append(buf)
            if (len(self.silence_buffer) > 3):
                del self.silence_buffer[0]
        if (self.out != None and self.out.closed != True):
            self.out.write(buf)
        if (self.append == True):
            self.prepare.prepare(buf, volume)
        if (self.append == True and self.silence_timer > 0
        and self.silence_timer + config.MAX_SILENCE_AFTER_START < time.time()
        and self.live == True):
            self.stop("stop append mode because of silence")
        if (self.append == True and self.timer + config.MAX_TIME < time.time()
        and self.live == True):
            self.stop("stop append mode because time is up")
项目:audio-visualizer-screenlet    作者:ninlith    | 项目源码 | 文件源码
def rms(audio_fragment, sample_width_in_bytes=2):
    """Given audio fragment, return the root mean square."""
    # Equal to
    # return int(np.sqrt(np.mean(np.square(
    #     np.absolute(audio_fragment.astype(np.float))))))
    return audioop.rms(np.abs(audio_fragment), sample_width_in_bytes)
项目:CodeLabs    作者:TheIoTLearningInitiative    | 项目源码 | 文件源码
def record(self):

        '''
        time.sleep(1)
        args = ['arecord', '-d', '5', self.voicefile]
        proc = subprocess.Popen(args)
        time.sleep(5)
        '''

        FORMAT = pyaudio.paInt16
        CHANNELS = 2
        RATE = 44100
        CHUNK = 1024
        RECORD_SECONDS = 5
        WAVE_OUTPUT_FILENAME = self.voicefile

        audio = pyaudio.PyAudio()

        # start Recording
        stream = audio.open(format=FORMAT, channels=CHANNELS,
                        rate=RATE, input=True,
                        frames_per_buffer=CHUNK)
        print "recording..."
        frames = []

        threshold = 1000
        for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
            data = stream.read(CHUNK)
            rms = audioop.rms(data,2)
            if rms > threshold:
                print "I am hearing you now"
            frames.append(data)
        print "finished recording"

        # stop Recording
        stream.stop_stream()
        stream.close()
        audio.terminate()

        waveFile = wave.open(WAVE_OUTPUT_FILENAME, 'wb')
        waveFile.setnchannels(CHANNELS)
        waveFile.setsampwidth(audio.get_sample_size(FORMAT))
        waveFile.setframerate(RATE)
        waveFile.writeframes(b''.join(frames))
        waveFile.close()
项目:CodeLabs    作者:TheIoTLearningInitiative    | 项目源码 | 文件源码
def record(self):

        '''
        time.sleep(1)
        args = ['arecord', '-d', '5', self.voicefile]
        proc = subprocess.Popen(args)
        time.sleep(5)
        '''

        FORMAT = pyaudio.paInt16
        CHANNELS = 2
        RATE = 44100
        CHUNK = 1024
        RECORD_SECONDS = 5
        WAVE_OUTPUT_FILENAME = self.voicefile

        audio = pyaudio.PyAudio()

        # start Recording
        stream = audio.open(format=FORMAT, channels=CHANNELS,
                        rate=RATE, input=True,
                        frames_per_buffer=CHUNK)
        print "recording..."
        frames = []

        threshold = 1000
        for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
            data = stream.read(CHUNK)
            rms = audioop.rms(data,2)
            if rms > threshold:
                print "I am hearing you now"
            frames.append(data)
        print "finished recording"

        # stop Recording
        stream.stop_stream()
        stream.close()
        audio.terminate()

        waveFile = wave.open(WAVE_OUTPUT_FILENAME, 'wb')
        waveFile.setnchannels(CHANNELS)
        waveFile.setsampwidth(audio.get_sample_size(FORMAT))
        waveFile.setframerate(RATE)
        waveFile.writeframes(b''.join(frames))
        waveFile.close()
项目:j2f    作者:jasper2fork    | 项目源码 | 文件源码
def wait_for_keyword(self, keyword=None):
        if not keyword:
            keyword = self._keyword
        frame_queue = queue.Queue()
        keyword_uttered = threading.Event()

        # FIXME: not configurable yet
        num_worker_threads = 2

        for i in range(num_worker_threads):
            t = threading.Thread(target=self.check_for_keyword,
                                 args=(frame_queue, keyword_uttered, keyword))
            t.daemon = True
            t.start()

        frames = collections.deque([], 30)
        recording = False
        recording_frames = []
        self._logger.info("Waiting for keyword '%s'...", keyword)
        for frame in self._input_device.record(self._input_chunksize,
                                               self._input_bits,
                                               self._input_channels,
                                               self._input_rate):
            if keyword_uttered.is_set():
                self._logger.info("Keyword %s has been uttered", keyword)
                return
            frames.append(frame)
            if not recording:
                snr = self._snr([frame])
                if snr >= 10:  # 10dB
                    # Loudness is higher than normal, start recording and use
                    # the last 10 frames to start
                    self._logger.debug("Started recording on device '%s'",
                                       self._input_device.slug)
                    self._logger.debug("Triggered on SNR of %sdB", snr)
                    recording = True
                    recording_frames = list(frames)[-10:]
                elif len(frames) >= frames.maxlen:
                    # Threshold SNR not reached. Update threshold with
                    # background noise.
                    self._threshold = float(audioop.rms("".join(frames), 2))
            else:
                # We're recording
                recording_frames.append(frame)
                if len(recording_frames) > 20:
                    # If we recorded at least 20 frames, check if we're below
                    # threshold again
                    last_snr = self._snr(recording_frames[-10:])
                    self._logger.debug(
                        "Recording's SNR dB: %f", last_snr)
                    if last_snr <= 3 or len(recording_frames) >= 60:
                        # The loudness of the sound is not at least as high as
                        # the the threshold, or we've been waiting too long
                        # we'll stop recording now
                        recording = False
                        self._logger.debug("Recorded %d frames",
                                           len(recording_frames))
                        frame_queue.put(tuple(recording_frames))
                        self._threshold = float(
                            audioop.rms(b"".join(frames), 2))