我们从Python开源项目中,提取了以下12个代码示例,用于说明如何使用numpy.fft.rfft()。
def freq_from_HPS(sig, fs): """ Estimate frequency using harmonic product spectrum (HPS) """ windowed = sig * blackmanharris(len(sig)) from pylab import subplot, plot, log, copy, show # harmonic product spectrum: c = abs(rfft(windowed)) maxharms = 3 #subplot(maxharms, 1, 1) #plot(log(c)) for x in range(2, maxharms): a = copy(c[::x]) # Should average or maximum instead of decimating # max(c[::x],c[1::x],c[2::x],...) c = c[:len(a)] i = argmax(abs(c)) true_i = parabolic(abs(c), i)[0] print 'Pass %d: %f Hz' % (x, fs * true_i / len(windowed)) c *= a #subplot(maxharms, 1, x) #plot(log(c)) #show()
def fft(self): self.spectre = fft.rfft(self.wave) self.frequencies = fft.rfftfreq(len(self.wave), 1. / self.framerate) # handle_line, = plt.plot(self.frequences, abs(self.spectre), label=self.start_time) # plt.legend(handles=[handle_line]) # plt.show()
def freq_from_fft(sig, fs): """ Estimate frequency from peak of FFT """ # Compute Fourier transform of windowed signal windowed = sig * blackmanharris(len(sig)) f = rfft(windowed) # Find the peak and interpolate to get a more accurate peak i = argmax(abs(f)) # Just use this for less-accurate, naive version true_i = parabolic(log(abs(f)), i)[0] # Convert to equivalent frequency return fs * true_i / len(windowed)
def single_step_propagation(self): """ Perform single step propagation. The final Wigner function is not normalized. :return: self.wignerfunction """ expV = self.get_expV(self.t) # p x -> theta x self.wignerfunction = fft.rfft(self.wignerfunction, axis=0) self.wignerfunction *= expV # theta x -> p x self.wignerfunction = fft.irfft(self.wignerfunction, axis=0) # p x -> p lambda self.wignerfunction = fft.rfft(self.wignerfunction, axis=1) self.wignerfunction *= self.get_expK(self.t) # p lambda -> p x self.wignerfunction = fft.irfft(self.wignerfunction, axis=1) # p x -> theta x self.wignerfunction = fft.rfft(self.wignerfunction, axis=0) self.wignerfunction *= expV # theta x -> p x self.wignerfunction = fft.irfft(self.wignerfunction, axis=0) # increment current time self.t += self.dt return self.wignerfunction
def single_step_propagation(self): """ Perform single step propagation. The final Wigner function is not normalized. :return: self.wignerfunction """ # p x -> theta x self.wignerfunction = fft.rfft(self.wignerfunction, axis=0) self.wignerfunction *= self.expV # theta x -> p x self.wignerfunction = fft.irfft(self.wignerfunction, axis=0) # p x -> p lambda self.wignerfunction = fft.rfft(self.wignerfunction, axis=1) self.wignerfunction *= self.expK # p lambda -> p x self.wignerfunction = fft.irfft(self.wignerfunction, axis=1) # p x -> theta x self.wignerfunction = fft.rfft(self.wignerfunction, axis=0) self.wignerfunction *= self.expV # theta x -> p x self.wignerfunction = fft.irfft(self.wignerfunction, axis=0) return self.wignerfunction
def theta_slice(self, *args): """ Slice array A listed in args as if they were returned by fft.rfft(A, axis=0) """ return (A[:(1 + self.P_gridDIM//2), :] for A in args)
def interp_helper(all_data, trend_data, time_from): 'performs lf spline + hf fft interpolation of radial distance' all_times, all_values = zip(*all_data) trend_times, trend_values = zip(*trend_data) split_time = int(time_to_index(time_from, all_times[0])) trend_indices = array([time_to_index(item, all_times[0]) for item in trend_times]) spline = splrep(trend_indices, array(trend_values)) all_indices = array([time_to_index(item, all_times[0]) for item in all_times]) trend = splev(all_indices, spline) detrended = array(all_values) - trend trend_add = splev(arange(split_time, all_indices[-1]+1), spline) dense_samples = detrended[:split_time] sparse_samples = detrended[split_time:] sparse_indices = (all_indices[split_time:]-split_time).astype(int) amp = log(absolute(rfft(dense_samples))) dense_freq = rfftfreq(dense_samples.size, 5) periods = (3000.0, 300.0) ind_from = int(round(1/(periods[0]*dense_freq[1]))) ind_to = int(round(1/(periods[1]*dense_freq[1]))) slope, _ = polyfit(log(dense_freq[ind_from:ind_to]), amp[ind_from:ind_to], 1) params = { 't_max': periods[0], 'slope': slope, 'n_harm': 9, 'scale': [20, 4, 2*pi] } series_func, residual_func = make_residual_func(sparse_samples, sparse_indices, **params) x0 = array([0.5]*(params["n_harm"]+2)) bounds = [(0, 1)]*(params["n_harm"]+2) result = minimize(residual_func, x0, method="L-BFGS-B", bounds=bounds, options={'eps':1e-2}) interp_values = [trend + high_freq for trend, high_freq in zip(trend_add, series_func(result.x)[:sparse_indices[-1]+1])] #make_qc_plot(arange(sparse_indices[-1]+1), interp_values, # sparse_indices, array(all_values[split_time:])) interp_times = [index_to_time(ind, time_from) for ind in range(sparse_indices[-1]+1)] return list(zip(interp_times, interp_values))
def run(): p = pyaudio.PyAudio() stream = p.open( format=pyaudio.paInt16, channels=1, # Mono rate=RATE, input=True, frames_per_buffer=CHUNK ) signal = empty(FFT_LEN, dtype=int16) try: # Disable cursor sys.stdout.write('\033[?25l') while 1: # Roll in new frame into buffer try: frame = stream.read(CHUNK) except IOError as e: if e[1] != pyaudio.paInputOverflowed: raise continue signal = roll(signal, -CHUNK) signal[-CHUNK:] = fromstring(frame, dtype=int16) # Now transform! try: fftspec = list(log(abs(x) * SIGNAL_SCALE) + 1.5 for x in rfft(signal)[:CHUNK*3]) except ValueError: fftspec = [0] * CHUNK * 3 # Print it lines = [ ''.join(spark(x - i+1, x) for x in fftspec) for i in range(HEIGHT, 0, -1) ] sys.stdout.write('?' + '?\n?'.join(lines) + '?') sys.stdout.write('\033[3A\r') except KeyboardInterrupt: sys.stdout.write('\n' * HEIGHT) finally: # Turn the cursor back on sys.stdout.write('\033[?25h')
def stft(time_signal, time_dim=None, size=1024, shift=256, window=signal.blackman, fading=True, window_length=None): """ Calculates the short time Fourier transformation of a multi channel multi speaker time signal. It is able to add additional zeros for fade-in and fade out and should yield an STFT signal which allows perfect reconstruction. :param time_signal: multi channel time signal. :param time_dim: Scalar dim of time. Default: None means the biggest dimension :param size: Scalar FFT-size. :param shift: Scalar FFT-shift. Typically shift is a fraction of size. :param window: Window function handle. :param fading: Pads the signal with zeros for better reconstruction. :param window_length: Sometimes one desires to use a shorter window than the fft size. In that case, the window is padded with zeros. The default is to use the fft-size as a window size. :return: Single channel complex STFT signal with dimensions frames times size/2+1. """ if time_dim is None: time_dim = np.argmax(time_signal.shape) # Pad with zeros to have enough samples for the window function to fade. if fading: pad = [(0, 0)] * time_signal.ndim pad[time_dim] = [size - shift, size - shift] time_signal = np.pad(time_signal, pad, mode='constant') # Pad with trailing zeros, to have an integral number of frames. frames = _samples_to_stft_frames(time_signal.shape[time_dim], size, shift) samples = _stft_frames_to_samples(frames, size, shift) pad = [(0, 0)] * time_signal.ndim pad[time_dim] = [0, samples - time_signal.shape[time_dim]] time_signal = np.pad(time_signal, pad, mode='constant') if window_length is None: window = window(size) else: window = window(window_length) window = np.pad(window, (0, size - window_length), mode='constant') time_signal_seg = segment_axis(time_signal, size, size - shift, axis=time_dim) letters = string.ascii_lowercase mapping = letters[:time_signal_seg.ndim] + ',' + letters[time_dim + 1] \ + '->' + letters[:time_signal_seg.ndim] return rfft(np.einsum(mapping, time_signal_seg, window), axis=time_dim + 1)
def stft(time_signal, time_dim=None, size=1024, shift=256, window=signal.blackman, fading=True, window_length=None): """ Calculates the short time Fourier transformation of a multi channel multi speaker time signal. It is able to add additional zeros for fade-in and fade out and should yield an STFT signal which allows perfect reconstruction. :param time_signal: multi channel time signal. :param time_dim: Scalar dim of time. Default: None means the biggest dimension :param size: Scalar FFT-size. :param shift: Scalar FFT-shift. Typically shift is a fraction of size. :param window: Window function handle. :param fading: Pads the signal with zeros for better reconstruction. :param window_length: Sometimes one desires to use a shorter window than the fft size. In that case, the window is padded with zeros. The default is to use the fft-size as a window size. :return: Single channel complex STFT signal with dimensions frames times size/2+1. """ if time_dim is None: time_dim = np.argmax(time_signal.shape) ''' # Pad with zeros to have enough samples for the window function to fade. if fading: pad = [(0, 0)] * time_signal.ndim pad[time_dim] = [size - shift, size - shift] time_signal = np.pad(time_signal, pad, mode='constant') ''' # Pad with trailing zeros, to have an integral number of frames. frames = _samples_to_stft_frames(time_signal.shape[time_dim], size, shift) samples = _stft_frames_to_samples(frames, size, shift) pad = [(0, 0)] * time_signal.ndim pad[time_dim] = [0, samples - time_signal.shape[time_dim]] time_signal = np.pad(time_signal, pad, mode='constant') if window_length is None: window = window(size) else: window = window(window_length) window = np.pad(window, (0, size - window_length), mode='constant') time_signal_seg = segment_axis(time_signal, size, size - shift, axis=time_dim) letters = string.ascii_lowercase mapping = letters[:time_signal_seg.ndim] + ',' + letters[time_dim + 1] \ + '->' + letters[:time_signal_seg.ndim] return rfft(np.einsum(mapping, time_signal_seg, window), axis=time_dim + 1)
def single_step_propagation(self): """ Perform single step propagation. The final Wigner functions are not normalized. """ ################ p x -> theta x ################ self.wigner_ge = fftpack.fft(self.wigner_ge, axis=0, overwrite_x=True) self.wigner_g = fftpack.fft(self.wigner_g, axis=0, overwrite_x=True) self.wigner_e = fftpack.fft(self.wigner_e, axis=0, overwrite_x=True) # Construct T matricies TgL, TgeL, TeL = self.get_T_left(self.t) TgR, TgeR, TeR = self.get_T_right(self.t) # Save previous version of the Wigner function Wg, Wge, We = self.wigner_g, self.wigner_ge, self.wigner_e # First update the complex valued off diagonal wigner function self.wigner_ge = (TgL*Wg + TgeL*Wge.conj())*TgeR + (TgL*Wge + TgeL*We)*TeR # Slice arrays to employ the symmetry (savings in speed) TgL, TgeL, TeL = self.theta_slice(TgL, TgeL, TeL) TgR, TgeR, TeR = self.theta_slice(TgR, TgeR, TeR) Wg, Wge, We = self.theta_slice(Wg, Wge, We) # Calculate the remaning real valued Wigner functions self.wigner_g = (TgL*Wg + TgeL*Wge.conj())*TgR + (TgL*Wge + TgeL*We)*TgeR self.wigner_e = (TgeL*Wg + TeL*Wge.conj())*TgeR + (TgeL*Wge + TeL*We)*TeR ################ Apply the phase factor ################ self.wigner_ge *= self.expV self.wigner_g *= self.expV[:(1 + self.P_gridDIM//2), :] self.wigner_e *= self.expV[:(1 + self.P_gridDIM//2), :] ################ theta x -> p x ################ self.wigner_ge = fftpack.ifft(self.wigner_ge, axis=0, overwrite_x=True) self.wigner_g = fft.irfft(self.wigner_g, axis=0) self.wigner_e = fft.irfft(self.wigner_e, axis=0) ################ p x -> p lambda ################ self.wigner_ge = fftpack.fft(self.wigner_ge, axis=1, overwrite_x=True) self.wigner_g = fft.rfft(self.wigner_g, axis=1) self.wigner_e = fft.rfft(self.wigner_e, axis=1) ################ Apply the phase factor ################ self.wigner_ge *= self.expK self.wigner_g *= self.expK[:, :(1 + self.X_gridDIM//2)] self.wigner_e *= self.expK[:, :(1 + self.X_gridDIM//2)] ################ p lambda -> p x ################ self.wigner_ge = fftpack.ifft(self.wigner_ge, axis=1, overwrite_x=True) self.wigner_g = fft.irfft(self.wigner_g, axis=1) self.wigner_e = fft.irfft(self.wigner_e, axis=1) #self.normalize_wigner_matrix()