我们从Python开源项目中,提取了以下27个代码示例,用于说明如何使用audioop.mul()。
def test_issue7673(self): state = None for data, size in INVALID_DATA: size2 = size self.assertRaises(audioop.error, audioop.getsample, data, size, 0) self.assertRaises(audioop.error, audioop.max, data, size) self.assertRaises(audioop.error, audioop.minmax, data, size) self.assertRaises(audioop.error, audioop.avg, data, size) self.assertRaises(audioop.error, audioop.rms, data, size) self.assertRaises(audioop.error, audioop.avgpp, data, size) self.assertRaises(audioop.error, audioop.maxpp, data, size) self.assertRaises(audioop.error, audioop.cross, data, size) self.assertRaises(audioop.error, audioop.mul, data, size, 1.0) self.assertRaises(audioop.error, audioop.tomono, data, size, 0.5, 0.5) self.assertRaises(audioop.error, audioop.tostereo, data, size, 0.5, 0.5) self.assertRaises(audioop.error, audioop.add, data, data, size) self.assertRaises(audioop.error, audioop.bias, data, size, 0) self.assertRaises(audioop.error, audioop.reverse, data, size) self.assertRaises(audioop.error, audioop.lin2lin, data, size, size2) self.assertRaises(audioop.error, audioop.ratecv, data, size, 1, 1, 1, state) self.assertRaises(audioop.error, audioop.lin2ulaw, data, size) self.assertRaises(audioop.error, audioop.lin2alaw, data, size) self.assertRaises(audioop.error, audioop.lin2adpcm, data, size, state)
def test_mul(self): for w in 1, 2, 3, 4: self.assertEqual(audioop.mul(b'', w, 2), b'') self.assertEqual(audioop.mul(bytearray(), w, 2), b'') self.assertEqual(audioop.mul(memoryview(b''), w, 2), b'') self.assertEqual(audioop.mul(datas[w], w, 0), b'\0' * len(datas[w])) self.assertEqual(audioop.mul(datas[w], w, 1), datas[w]) self.assertEqual(audioop.mul(datas[1], 1, 2), b'\x00\x24\x7f\x80\x7f\x80\xfe') self.assertEqual(audioop.mul(datas[2], 2, 2), packs[2](0, 0x2468, 0x7fff, -0x8000, 0x7fff, -0x8000, -2)) self.assertEqual(audioop.mul(datas[3], 3, 2), packs[3](0, 0x2468ac, 0x7fffff, -0x800000, 0x7fffff, -0x800000, -2)) self.assertEqual(audioop.mul(datas[4], 4, 2), packs[4](0, 0x2468acf0, 0x7fffffff, -0x80000000, 0x7fffffff, -0x80000000, -2))
def test_string(self): data = 'abcd' size = 2 self.assertRaises(TypeError, audioop.getsample, data, size, 0) self.assertRaises(TypeError, audioop.max, data, size) self.assertRaises(TypeError, audioop.minmax, data, size) self.assertRaises(TypeError, audioop.avg, data, size) self.assertRaises(TypeError, audioop.rms, data, size) self.assertRaises(TypeError, audioop.avgpp, data, size) self.assertRaises(TypeError, audioop.maxpp, data, size) self.assertRaises(TypeError, audioop.cross, data, size) self.assertRaises(TypeError, audioop.mul, data, size, 1.0) self.assertRaises(TypeError, audioop.tomono, data, size, 0.5, 0.5) self.assertRaises(TypeError, audioop.tostereo, data, size, 0.5, 0.5) self.assertRaises(TypeError, audioop.add, data, data, size) self.assertRaises(TypeError, audioop.bias, data, size, 0) self.assertRaises(TypeError, audioop.reverse, data, size) self.assertRaises(TypeError, audioop.lin2lin, data, size, size) self.assertRaises(TypeError, audioop.ratecv, data, size, 1, 1, 1, None) self.assertRaises(TypeError, audioop.lin2ulaw, data, size) self.assertRaises(TypeError, audioop.lin2alaw, data, size) self.assertRaises(TypeError, audioop.lin2adpcm, data, size, None)
def _write_frames_to_file(self, frames, framerate, volume): with tempfile.NamedTemporaryFile(mode='w+b') as f: wav_fp = wave.open(f, 'wb') wav_fp.setnchannels(self._input_channels) wav_fp.setsampwidth(int(self._input_bits / 8)) wav_fp.setframerate(framerate) if self._input_rate == framerate: fragment = ''.join(frames) else: fragment = audioop.ratecv(''.join(frames), int(self._input_bits / 8), self._input_channels, self._input_rate, framerate, None)[0] if volume is not None: maxvolume = audioop.minmax(fragment, self._input_bits / 8)[1] fragment = audioop.mul( fragment, int(self._input_bits / 8), volume * (2. ** 15) / maxvolume) wav_fp.writeframes(fragment) wav_fp.close() f.seek(0) yield f
def test_mul(self): data2 = [] for d in data: str = bytearray(len(d)) for i,b in enumerate(d): str[i] = 2*b data2.append(str) self.assertEqual(audioop.mul(data[0], 1, 2), data2[0]) self.assertEqual(audioop.mul(data[1],2, 2), data2[1]) self.assertEqual(audioop.mul(data[2], 4, 2), data2[2])
def run(self): self.loops = 0 self._start = time.time() while not self._end.is_set(): # are we paused? if not self._resumed.is_set(): # wait until we aren't self._resumed.wait() if not self._connected.is_set(): self.stop() break self.loops += 1 data = self.data.pop(0) if self._volume != 1.0: data = audioop.mul(data, 2, min(self._volume, 2.0)) """if len(data) != self.frame_size: self.stop() break""" self.player(data) next_time = self._start + self.delay * self.loops delay = max(0, self.delay + (next_time - time.time())) time.sleep(delay)
def test_mul(self): for w in 1, 2, 4: self.assertEqual(audioop.mul(b'', w, 2), b'') self.assertEqual(audioop.mul(datas[w], w, 0), b'\0' * len(datas[w])) self.assertEqual(audioop.mul(datas[w], w, 1), datas[w]) self.assertEqual(audioop.mul(datas[1], 1, 2), b'\x00\x24\x7f\x80\x7f\x80\xfe') self.assertEqual(audioop.mul(datas[2], 2, 2), packs[2](0, 0x2468, 0x7fff, -0x8000, 0x7fff, -0x8000, -2)) self.assertEqual(audioop.mul(datas[4], 4, 2), packs[4](0, 0x2468acf0, 0x7fffffff, -0x80000000, 0x7fffffff, -0x80000000, -2))
def read(self): ret = self.original.read() return audioop.mul(ret, 2, self._volume)
def loop(self): while not self.exit: if self.playing: while self.mumble.sound_output.get_buffer_size() > 0.5 and self.playing: time.sleep(0.01) self.mumble.sound_output.add_sound(audioop.mul(self.thread.stdout.read(480), 2, self.volume)) else: time.sleep(1) while self.mumble.sound_output.get_buffer_size() > 0: time.sleep(0.01) time.sleep(0.5)
def _frame_vol(self, frame, mult, *, maxv=2, use_audioop=True): if use_audioop: return audioop.mul(frame, 2, min(mult, maxv)) else: # ffmpeg returns s16le pcm frames. frame_array = array('h', frame) for i in range(len(frame_array)): frame_array[i] = int(frame_array[i] * min(mult, min(1, maxv))) return frame_array.tobytes()
def get_32bit_frames(self, scale_amplitude=True): """Returns the raw sample frames scaled to 32 bits. See make_32bit method for more info.""" if self.samplewidth == 4: return self.__frames frames = audioop.lin2lin(self.__frames, self.samplewidth, 4) if not scale_amplitude: # we need to scale back the sample amplitude to fit back into 24/16/8 bit range factor = 1.0/2**(8*abs(self.samplewidth-4)) frames = audioop.mul(frames, 4, factor) return frames
def amplify_max(self): """Amplify the sample to maximum volume without clipping or overflow happening.""" assert not self.__locked max_amp = audioop.max(self.__frames, self.samplewidth) max_target = 2 ** (8 * self.samplewidth - 1) - 2 if max_amp > 0: factor = max_target/max_amp self.__frames = audioop.mul(self.__frames, self.samplewidth, factor) return self
def amplify(self, factor): """Amplifies (multiplies) the sample by the given factor. May cause clipping/overflow if factor is too large.""" assert not self.__locked self.__frames = audioop.mul(self.__frames, self.samplewidth, factor) return self
def draw_wave(self, stream, force=False): # print stream.getnframes() if self.wavepath: self.scene().removeItem(self.wavepath) self.fitInView(0, 0, 1, 1) self.current_sampwidth = sampwidth = stream.getsampwidth() self.current_sampwidth_int = delta = 2**(8*sampwidth) if stream in self.cache and not force: self.current_data, wavepath = self.cache[stream] else: stream.rewind() frames = stream.getnframes() ratio = frames / 64 if stream.getnchannels() == 2: data = audioop.tomono(stream.readframes(float('inf')), sampwidth, self.main.left_spin.value(), self.main.right_spin.value()) else: data = stream.readframes(float('inf')) data = audioop.mul(data, sampwidth, self.main.gain) self.current_data = data wavepath = QtGui.QPainterPath() try: for frame_set in xrange(ratio): frame_min = frame_max = 0 for frame in xrange(64): try: value = audioop.getsample(data, sampwidth, frame + frame_set * 64) frame_min = min(frame_min, value) frame_max = max(frame_max, value) except: break if frame == 0: break wavepath.moveTo(frame_set, delta - frame_min) wavepath.lineTo(frame_set, delta - frame_max) except: pass self.cache[stream] = data, wavepath self.wavepath = self.scene().addPath(wavepath) self.wavepath.setPen(self.wave_pen) self.wavepath.setY(-delta * .5) self.wavepath.setX(self.left_margin*2) self.fitInView(0, 0, self.zoom_values[self.zoom], delta) if not force: self.centerOn(self.wavepath) self.right_margin_item.setX(len(self.current_data)/self.current_sampwidth/64) visible = self.mapToScene(self.viewport().rect()).boundingRect() if visible.width() > self.wavepath.boundingRect().width(): self.scene().setSceneRect(-self.left_margin, 0, visible.width(), delta) else: self.scene().setSceneRect(-self.left_margin, 0, self.wavepath.boundingRect().width(), delta)