我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用scipy.fftpack.fft()。
def plot_deriv_fft(df): # Number of samplepoints N = len(df) # sample spacing T = BINSIZE*60.0 x = np.linspace(0.0, N*T, N-1) y = [df.values[i]-df.values[i-1] for i in range(1,len(df.values))] #np.sin(50.0 * 2.0*np.pi*x) + 0.5*np.sin(80.0 * 2.0*np.pi*x) yf = fftpack.fft(y) xf = np.linspace(0.0, 1.0/(2.0*T), N/2) fig, ax = plt.subplots() ax.plot(x, y) plt.savefig(FIG_DIR+'deriv.png', bbox_inches='tight') fig, ax = plt.subplots() ax.plot(xf, 2.0/N * np.abs(yf[:N//2])) plt.savefig(FIG_DIR+'fft_deriv.png', bbox_inches='tight')
def stft(X, fftsize=128, step="half", mean_normalize=True, real=False, compute_onesided=True): """ Compute STFT for 1D real valued input X """ if real: local_fft = fftpack.rfft cut = -1 else: local_fft = fftpack.fft cut = None if compute_onesided: cut = fftsize // 2 + 1 if mean_normalize: X -= X.mean() if step == "half": X = halfoverlap(X, fftsize) else: X = overlap(X, fftsize, step) size = fftsize win = 0.54 - .46 * np.cos(2 * np.pi * np.arange(size) / (size - 1)) X = X * win[None] X = local_fft(X)[:, :cut] return X
def cheaptrick_get_power_spectrum(waveform, fs, fft_size, f0): power_spectrum = np.abs(np.fft.fft(waveform, fft_size)) ** 2 frequency_axis = np.arange(fft_size) / float(fft_size) * float(fs) ind = frequency_axis < (f0 + fs / fft_size) low_frequency_axis = frequency_axis[ind] low_frequency_replica = interp1d(f0 - low_frequency_axis, power_spectrum[ind], kind="linear", fill_value="extrapolate")(low_frequency_axis) p1 = low_frequency_replica[(frequency_axis < f0)[:len(low_frequency_replica)]] p2 = power_spectrum[(frequency_axis < f0)[:len(power_spectrum)]] power_spectrum[frequency_axis < f0] = p1 + p2 lb1 = int(fft_size / 2) + 1 lb2 = 1 ub2 = int(fft_size / 2) power_spectrum[lb1:] = power_spectrum[lb2:ub2][::-1] return power_spectrum
def d4c_love_train(x, fs, current_f0, current_position, threshold): vuv = 0 if current_f0 == 0: return vuv lowest_f0 = 40 current_f0 = max([current_f0, lowest_f0]) fft_size = int(2 ** np.ceil(np.log2(3. * fs / lowest_f0 + 1))) boundary0 = int(np.ceil(100 / (float(fs) / fft_size))) boundary1 = int(np.ceil(4000 / (float(fs) / fft_size))) boundary2 = int(np.ceil(7900 / (float(fs) / fft_size))) waveform = d4c_get_windowed_waveform(x, fs, current_f0, current_position, 1.5, 2) power_spectrum = np.abs(np.fft.fft(waveform, int(fft_size)) ** 2) power_spectrum[0:boundary0 + 1] = 0. cumulative_spectrum = np.cumsum(power_spectrum) if (cumulative_spectrum[boundary1] / cumulative_spectrum[boundary2]) > threshold: vuv = 1 return vuv
def win2mgc(windowed_signal, order=20, alpha=0.35, gamma=-0.41, miniter=2, maxiter=30, criteria=0.001, otype=0, verbose=False): """ Accepts 1D or 2D array of windowed signal frames. If 2D, assumes time is axis 0. Returns mel generalized cepstral coefficients. Based on r9y9 Julia code https://github.com/r9y9/MelGeneralizedCepstrums.jl """ if len(windowed_signal.shape) == 1: sp = np.fft.fft(windowed_signal) return _sp2mgc(sp, order=order, alpha=alpha, gamma=gamma, miniter=miniter, maxiter=maxiter, criteria=criteria, otype=otype, verbose=verbose) else: raise ValueError("2D input not yet complete for win2mgc")
def run_mgc_example(): import matplotlib.pyplot as plt fs, x = wavfile.read("test16k.wav") pos = 3000 fftlen = 1024 win = np.blackman(fftlen) / np.sqrt(np.sum(np.blackman(fftlen) ** 2)) xw = x[pos:pos + fftlen] * win sp = 20 * np.log10(np.abs(np.fft.rfft(xw))) mgc_order = 20 mgc_alpha = 0.41 mgc_gamma = -0.35 mgc_arr = win2mgc(xw, order=mgc_order, alpha=mgc_alpha, gamma=mgc_gamma, verbose=True) xwsp = 20 * np.log10(np.abs(np.fft.rfft(xw))) sp = mgc2sp(mgc_arr, mgc_alpha, mgc_gamma, fftlen) plt.plot(xwsp) plt.plot(20. / np.log(10) * np.real(sp), "r") plt.xlim(1, len(xwsp)) plt.show()
def carr_madan_fraction_fft_call_pricer(N, d_u, d_k, alpha, r, t, S0, q, chf_ln_st): rou = (d_u * d_k) / (2 * np.pi) beta = np.log(S0) - d_k * N / 2 u_arr = np.arange(N) * d_u k_arr = beta + np.arange(N) * d_k delta_arr = np.zeros(N) delta_arr[0] = 1 w_arr = d_u / 3 * (3 + (-1) ** (np.arange(N) + 1) - delta_arr) call_chf = (np.exp(-r * t) / ((alpha + 1j * u_arr) * (alpha + 1j * u_arr + 1))) * chf_ln_st( u_arr - (alpha + 1) * 1j, t, r, q=q, S0=S0) x_arr = np.exp(-1j * beta * u_arr) * call_chf * w_arr y_arr = np.zeros(2 * N) * 0j y_arr[:N] = np.exp(-1j * np.pi * rou * np.arange(N) ** 2) * x_arr z_arr = np.zeros(2 * N) * 0j z_arr[:N] = np.exp(1j * np.pi * rou * np.arange(N) ** 2) z_arr[N:] = np.exp(1j * np.pi * rou * np.arange(N - 1, -1, -1) ** 2) ffty = (fft(y_arr)) fftz = (fft(z_arr)) fftx = ffty * fftz fftpsi = ifft(fftx) fft_prices = np.exp(-1j * np.pi * (np.arange(N) ** 2) * rou) * fftpsi[:N] call_prices = (np.exp(-alpha * k_arr) / np.pi) * fft_prices.real return np.exp(k_arr), call_prices
def compute_zr(self): """ Return z(r) matrix """ r = np.array([i*self.dr for i in range(self.ngrid)]) k, zk = self.compute_zk() print 'computed zk',zk.shape zr = [["" for i in range(self.nsites)] for j in range(self.nsites)] for i in range(self.nsites): for j in range(self.nsites): zk_ij = zk[1:,i,j] zr_ij = pubfft.sinfti(zk_ij*k[1:], self.dr, -1)/r[1:] #zr_ij = np.abs(fftpack.fft(zk_ij)) n_pots_for_interp = 6 r_for_interp = r[1:n_pots_for_interp+1] zr_for_interp = zr_ij[:n_pots_for_interp] poly_coefs = np.polyfit(r_for_interp, zr_for_interp, 3) poly_f = np.poly1d(poly_coefs) zr[i][j] = [poly_f(0)] zr[i][j].extend(zr_ij) return r, np.swapaxes(zr, 0, 2)
def _lpc(x,order): """Linear predictor coefficients. Supports 1D and 2D arrays only through iteration.""" x = np.asarray(x) if x.ndim == 1: m = x.size X = fft(x,n=nextpow2(m)) # TODO nextpower of 2 R = np.real(ifft(np.abs(X)**2)) # Auto-correlation matrix R = R/m a = _levinson(r=R, order=order)[0] a = np.hstack((1., a)) elif x.ndim == 2: m,n = x.shape X = fft(x,n=nextpow2(m), axis=0) R = np.real(ifft(np.abs(X)**2, axis=0)) # Auto-correlation matrix R = R/m a = np.ones((order+1,n)) for col in range(n): a[1:,col] = _levinson(r=R[:,col], order=order)[0] else: raise ValueError('Supported for 1-D or 2-D arrays only.') return a
def test_high_frequency_completion(self): path = dirpath + '/data/test16000.wav' fs, x = wavfile.read(path) f0rate = 0.5 shifter = Shifter(fs, f0rate=f0rate) mod_x = shifter.f0transform(x, completion=False) mod_xc = shifter.f0transform(x, completion=True) assert len(mod_x) == len(mod_xc) N = 512 fl = int(fs * 25 / 1000) win = np.hanning(fl) sts = [1000, 5000, 10000, 20000] for st in sts: # confirm w/o completion f_mod_x = fft(mod_x[st: st + fl] / 2**16 * win) amp_mod_x = 20.0 * np.log10(np.abs(f_mod_x)) # confirm w/ completion f_mod_xc = fft(mod_xc[st: st + fl] / 2**16 * win) amp_mod_xc = 20.0 * np.log10(np.abs(f_mod_xc)) assert np.mean(amp_mod_x[N // 4:] < np.mean(amp_mod_xc[N // 4:]))
def compare_fft(date, x, file_name='comparefft.jpg'): from scipy.fftpack import fft, ifft ax = [] l = len(x) t = 50 x = exp_moving_average(x, 10) labels = [] ax.append(x[t:l-t]) labels.append("??????? ???") ax.append((fft(x)[t:l-t])) labels.append("???’? ????????????") # print(l, len(np.fft.ifft(np.fft.fft(x)[5:l-5])[5:l-5])) print(len(x)) # print(len(ifft(np.log(np.abs(fft(x)[5:l-5])))[5:l-5])) # ax.append((ifft(np.log(np.abs(fft(x))))[t:l-t])[:l/2 - t]) # at.append((date[t:l-t])[::2]) print("---") showPlotLabelsCompare(ax, date[t:l-t], labels, file_name)
def compare_cepstrum(date, x, file_name='comparecep.jpg'): # date, x = hist_data.get_historical_quotes(start_date=datetime.datetime(2000, 2, 2), # end_date=datetime.datetime(2001, 2, 2), # csv_path='../../data/usdgbp1990.csv') from scipy.fftpack import fft, ifft ax = [] at = [] l = len(x) t = 50 x = exp_moving_average(x, 10) ax.append(x[t:l-t]) at.append(date[t:l-t]) # print(l, len(np.fft.ifft(np.fft.fft(x)[5:l-5])[5:l-5])) print(len(x)) # print(len(ifft(np.log(np.abs(fft(x)[5:l-5])))[5:l-5])) ax.append((ifft(np.log(np.abs(fft(x))))[t:l-t])[:l/2 - t]) at.append((date[t:l-t])[::2]) print("---") showPlotCompareSeparate(ax, at, file_name)
def quad_fourier(filtered_enb): """ Integrate fft function for fixed period filtered_enb - info about entropy blocks (size, entropy, shift) """ x = np.linspace(-pi_range, pi_range) y = 0.0 period_length = 10000.0 if len(filtered_enb) == 1 and filtered_enb[0][2] == 8.0: return 0, 0 for i in range(len(filtered_enb)): m = 1 if i % 2 != 0: m = -1 block_length = abs(filtered_enb[i][1] - filtered_enb[i][0]) if block_length == 0: continue T = block_length / period_length y += m * filtered_enb[i][2] * \ np.sin(x * ((2 * np.pi) / T) + block_length / period_length) yf = fft(y) return integrate.quad(lambda a: yf[a], -pi_range, pi_range)
def test_savgol_filter(): """Test savgol filtering """ h_freq = 10. raw, events = _get_data()[:2] epochs = Epochs(raw, events, event_id, tmin, tmax) assert_raises(RuntimeError, epochs.savgol_filter, 10.) epochs = Epochs(raw, events, event_id, tmin, tmax, preload=True) freqs = fftpack.fftfreq(len(epochs.times), 1. / epochs.info['sfreq']) data = np.abs(fftpack.fft(epochs.get_data())) match_mask = np.logical_and(freqs >= 0, freqs <= h_freq / 2.) mismatch_mask = np.logical_and(freqs >= h_freq * 2, freqs < 50.) epochs.savgol_filter(h_freq) data_filt = np.abs(fftpack.fft(epochs.get_data())) # decent in pass-band assert_allclose(np.mean(data[:, :, match_mask], 0), np.mean(data_filt[:, :, match_mask], 0), rtol=1e-4, atol=1e-2) # suppression in stop-band assert_true(np.mean(data[:, :, mismatch_mask]) > np.mean(data_filt[:, :, mismatch_mask]) * 5)
def test_savgol_filter(): """Test savgol filtering """ h_freq = 10. evoked = read_evokeds(fname, 0) freqs = fftpack.fftfreq(len(evoked.times), 1. / evoked.info['sfreq']) data = np.abs(fftpack.fft(evoked.data)) match_mask = np.logical_and(freqs >= 0, freqs <= h_freq / 2.) mismatch_mask = np.logical_and(freqs >= h_freq * 2, freqs < 50.) assert_raises(ValueError, evoked.savgol_filter, evoked.info['sfreq']) evoked_sg = evoked.copy().savgol_filter(h_freq) data_filt = np.abs(fftpack.fft(evoked_sg.data)) # decent in pass-band assert_allclose(np.mean(data[:, match_mask], 0), np.mean(data_filt[:, match_mask], 0), rtol=1e-4, atol=1e-2) # suppression in stop-band assert_true(np.mean(data[:, mismatch_mask]) > np.mean(data_filt[:, mismatch_mask]) * 5) # original preserved assert_allclose(data, np.abs(fftpack.fft(evoked.data)), atol=1e-16)
def _st_power_itc(x, start_f, compute_itc, zero_pad, decim, W): """Aux function""" n_samp = x.shape[-1] n_out = (n_samp - zero_pad) n_out = n_out // decim + bool(n_out % decim) psd = np.empty((len(W), n_out)) itc = np.empty_like(psd) if compute_itc else None X = fftpack.fft(x) XX = np.concatenate([X, X], axis=-1) for i_f, window in enumerate(W): f = start_f + i_f ST = fftpack.ifft(XX[:, f:f + n_samp] * window) TFR = ST[:, :-zero_pad:decim] TFR_abs = np.abs(TFR) if compute_itc: TFR /= TFR_abs itc[i_f] = np.abs(np.mean(TFR, axis=0)) TFR_abs *= TFR_abs psd[i_f] = np.mean(TFR_abs, axis=0) return psd, itc
def _check_method(method, iir_params, extra_types): """Helper to parse method arguments""" allowed_types = ['iir', 'fft'] + extra_types if not isinstance(method, string_types): raise TypeError('method must be a string') if method not in allowed_types: raise ValueError('method must be one of %s, not "%s"' % (allowed_types, method)) if method == 'iir': if iir_params is None: iir_params = dict(order=4, ftype='butter') if not isinstance(iir_params, dict): raise ValueError('iir_params must be a dict') elif iir_params is not None: raise ValueError('iir_params must be None if method != "iir"') method = method.lower() return iir_params
def cut_norm(full_y, dt, area=1.0): """Cut out an FFT spectrum from scipy.fftpack.fft() (or numpy.fft.fft()) result and normalize the integral `int(f) y(f) df = area`. full_y : 1d array Result of fft(...) dt : time step area : integral area """ full_faxis = np.fft.fftfreq(full_y.shape[0], dt) split_idx = full_faxis.shape[0]/2 y_out = full_y[:split_idx] faxis = full_faxis[:split_idx] return faxis, num.norm_int(y_out, faxis, area=area) ############################################################################### # common settings for 1d and 3d case ###############################################################################
def test_pdos_1d(): pad=lambda x: pad_zeros(x, nadd=len(x)-1) n=500; w=welch(n) # 1 second signal t=np.linspace(0,1,n); dt=t[1]-t[0] # sum of sin()s with random freq and phase shift, 10 frequencies from # f=0...100 Hz v=np.array([np.sin(2*np.pi*f*t + rand()*2*np.pi) for f in rand(10)*100]).sum(0) f=np.fft.fftfreq(2*n-1, dt)[:n] c1=mirror(ifft(abs(fft(pad(v)))**2.0)[:n].real) c2=correlate(v,v,'full') c3=mirror(acorr(v,norm=False)) assert np.allclose(c1, c2) assert np.allclose(c1, c3) p1=(abs(fft(pad(v)))**2.0)[:n] p2=(abs(fft(mirror(acorr(v,norm=False)))))[:n] assert np.allclose(p1, p2) p1=(abs(fft(pad(v*w)))**2.0)[:n] p2=(abs(fft(mirror(acorr(v*w,norm=False)))))[:n] assert np.allclose(p1, p2)
def fft_1d_loop(arr, axis=-1): """Like scipy.fft.pack.fft and numpy.fft.fft, perform fft along an axis. Here do this by looping over remaining axes and perform 1D FFTs. This was implemented as a low-memory version like :func:`~pwtools.crys.smooth` to be used in :func:`~pwtools.pydos.pdos`, which fills up the memory for big MD data. But actually it has the same memory footprint as the plain scipy fft routine. Keep it here anyway as a nice reference for how to loop over remaining axes in the ndarray case. """ if axis < 0: axis = arr.ndim - 1 axes = [ax for ax in range(arr.ndim) if ax != axis] # tuple here is 3x faster than generator expression # idxs = (range(arr.shape[ax]) for ax in axes) idxs = tuple(range(arr.shape[ax]) for ax in axes) out = np.empty(arr.shape, dtype=complex) for idx_tup in product(*idxs): sl = [slice(None)] * arr.ndim for idx,ax in izip(idx_tup, axes): sl[ax] = idx out[sl] = fft(arr[sl]) return out
def dct(data): """ Compute DCT using FFT """ N = len(data)//2 fftdata = fftpack.fft(data, axis=0)[:N+1] fftdata /= N fftdata[0] /= 2. fftdata[-1] /= 2. if np.isrealobj(data): data = np.real(fftdata) else: data = fftdata return data # ---------------------------------------------------------------- # Add overloaded operators # ----------------------------------------------------------------
def stft(X, fftsize=128, step="half", mean_normalize=True, real=False, compute_onesided=True): """ Compute STFT for 1D real valued input X """ if real: local_fft = fftpack.rfft cut = -1 else: local_fft = fftpack.fft cut = None if compute_onesided: cut = fftsize // 2 if mean_normalize: X -= X.mean() if step == "half": X = halfoverlap(X, fftsize) else: X = overlap(X, fftsize, step) size = fftsize win = 0.54 - .46 * np.cos(2 * np.pi * np.arange(size) / (size - 1)) X = X * win[None] X = local_fft(X)[:, :cut] return X
def spectraltest(binin): '''The focus of this test is the peak heights in the discrete Fast Fourier Transform. The purpose of this test is to detect periodic features (i.e., repetitive patterns that are near each other) in the tested sequence that would indicate a deviation from the assumption of randomness. ''' n = len(binin) ss = [int(el) for el in binin] sc = map(sumi, ss) ft = sff.fft(sc) af = abs(ft)[1:n/2+1:] t = np.sqrt(np.log(1/0.05)*n) n0 = 0.95*n/2 n1 = len(np.where(af<t)[0]) d = (n1 - n0)/np.sqrt(n*0.95*0.05/4) pval = spc.erfc(abs(d)/np.sqrt(2)) return pval > 0.01
def DFT(x, w, N): """ Discrete Fourier Transformation(Analysis) of a given real input signal via an FFT implementation from scipy. Single channel is being supported. Args: x : (array) Real time domain input signal w : (array) Desired windowing function N : (int) FFT size Returns: magX : (2D ndarray) Magnitude Spectrum phsX : (2D ndarray) Phase Spectrum """ # Half spectrum size containing DC component hlfN = (N/2)+1 # Half window size. Two parameters to perform zero-phase windowing technique hw1 = int(math.floor((w.size+1)/2)) hw2 = int(math.floor(w.size/2)) # Window the input signal winx = x*w # Initialize FFT buffer with zeros and perform zero-phase windowing fftbuffer = np.zeros(N) fftbuffer[:hw1] = winx[hw2:] fftbuffer[-hw2:] = winx[:hw2] # Compute DFT via scipy's FFT implementation X = fft(fftbuffer) # Acquire magnitude and phase spectrum magX = (np.abs(X[:hlfN])) phsX = (np.angle(X[:hlfN])) return magX, phsX
def test_phase_randomize(): from brainiak.utils.utils import phase_randomize import numpy as np from scipy.fftpack import fft import math from scipy.stats import pearsonr # Generate auto-correlated signals nv = 2 T = 100 ns = 3 D = np.zeros((nv, T, ns)) for v in range(nv): for s in range(ns): D[v, :, s] = np.sin(np.linspace(0, math.pi * 5 * (v + 1), T)) + \ np.sin(np.linspace(0, math.pi * 6 * (s + 1), T)) freq = fft(D, axis=1) D_pr = phase_randomize(D) freq_pr = fft(D_pr, axis=1) p_corr = pearsonr(np.angle(freq).flatten(), np.angle(freq_pr).flatten())[0] assert np.isclose(abs(freq), abs(freq_pr)).all(), \ "Amplitude spectrum not preserved under phase randomization" assert abs(p_corr) < 0.03, \ "Phases still correlated after randomization"
def fft_random(n): for i in range(n): x = random.normal(0, 0.1, 2000) y = fft(x) if(i%30 == 0): process_percent = int(100 * float(i)/float(n)) current_task.update_state(state='PROGRESS', meta={'process_percent': process_percent}) return random.random()
def test(tid, n): for i in range(n): x = random.normal(0, 0.1, 2000) y = fft(x) result = dict() result['x'] = random.random() result['y'] = random.randint(100) return result # @task_success.connect(sender='celeryapp.tasks.fft_random')
def plotFFT(df): # Number of samplepoints N = len(df) # sample spacing T = BINSIZE*60.0 x = np.linspace(0.0, N*T, N) y = df.values #np.sin(50.0 * 2.0*np.pi*x) + 0.5*np.sin(80.0 * 2.0*np.pi*x) yf = fftpack.fft(y) xf = np.linspace(0.0, 1.0/(2.0*T), N/2) fig, ax = plt.subplots() ax.plot(xf, 2.0/N * np.abs(yf[:N//2])) plt.savefig(FIG_DIR+'fft.png', bbox_inches='tight')
def frequencies(self): """ :return: frequency array """ if self.baseband: return np.fft.rfftfreq(self.data_length*self.PADDING_FACTOR, self.sampling_time) else: return self.center + scipy.fftpack.fftshift( scipy.fftpack.fftfreq( self.data_length*self.PADDING_FACTOR, self.sampling_time)) #[self.useful_index()]
def _shift_fft(array, shift_value): Ndim = array.ndim dims = array.shape dtype = array.dtype.kind if (dtype != 'f'): raise ValueError('Array must be float') shifted = array if (Ndim == 1): Nx = dims[0] x_ramp = np.arange(Nx, dtype=array.dtype) - Nx//2 tilt = (2*np.pi/Nx) * (shift_value[0]*x_ramp) cplx_tilt = np.cos(tilt) + 1j*np.sin(tilt) cplx_tilt = fft.fftshift(cplx_tilt) narray = fft.fft(fft.ifft(array) * cplx_tilt) shifted = narray.real elif (Ndim == 2): Nx = dims[0] Ny = dims[1] x_ramp = np.outer(np.full(Nx, 1.), np.arange(Ny, dtype=array.dtype)) - Nx//2 y_ramp = np.outer(np.arange(Nx, dtype=array.dtype), np.full(Ny, 1.)) - Ny//2 tilt = (2*np.pi/Nx) * (shift_value[0]*x_ramp+shift_value[1]*y_ramp) cplx_tilt = np.cos(tilt) + 1j*np.sin(tilt) cplx_tilt = fft.fftshift(cplx_tilt) narray = fft.fft2(fft.ifft2(array) * cplx_tilt) shifted = narray.real else: raise ValueError('This function can shift only 1D or 2D arrays') return shifted
def harvest_get_f0_candidate_from_raw_event(boundary_f0, fs, y_spectrum, y_length, temporal_positions, f0_floor, f0_ceil): filter_length_half = int(np.round(fs / boundary_f0 * 2)) band_pass_filter_base = harvest_nuttall(filter_length_half * 2 + 1) shifter = np.cos(2 * np.pi * boundary_f0 * np.arange(-filter_length_half, filter_length_half + 1) / float(fs)) band_pass_filter = band_pass_filter_base * shifter index_bias = filter_length_half # possible numerical issues if 32 bit spectrum_low_pass_filter = np.fft.fft(band_pass_filter.astype("float64"), len(y_spectrum)) filtered_signal = np.real(np.fft.ifft(spectrum_low_pass_filter * y_spectrum)) index_bias = filter_length_half + 1 filtered_signal = filtered_signal[index_bias + np.arange(y_length).astype("int32")] negative_zero_cross = harvest_zero_crossing_engine(filtered_signal, fs) positive_zero_cross = harvest_zero_crossing_engine(-filtered_signal, fs) d_filtered_signal = filtered_signal[1:] - filtered_signal[:-1] peak = harvest_zero_crossing_engine(d_filtered_signal, fs) dip = harvest_zero_crossing_engine(-d_filtered_signal, fs) f0_candidate = harvest_get_f0_candidate_contour(negative_zero_cross, positive_zero_cross, peak, dip, temporal_positions) f0_candidate[f0_candidate > (boundary_f0 * 1.1)] = 0. f0_candidate[f0_candidate < (boundary_f0 * .9)] = 0. f0_candidate[f0_candidate > f0_ceil] = 0. f0_candidate[f0_candidate < f0_floor] = 0. return f0_candidate
def harvest(x, fs): f0_floor = 71 f0_ceil = 800 target_fs = 8000 channels_in_octave = 40. basic_temporal_positions, temporal_positions = _world_get_temporal_positions(len(x), fs) adjusted_f0_floor = f0_floor * 0.9 adjusted_f0_ceil = f0_ceil * 1.1 boundary_f0_list = np.arange(1, np.ceil(np.log2(adjusted_f0_ceil / adjusted_f0_floor) * channels_in_octave) + 1) / float(channels_in_octave) boundary_f0_list = adjusted_f0_floor * 2.0 ** boundary_f0_list y, actual_fs = harvest_get_downsampled_signal(x, fs, target_fs) fft_size = 2. ** np.ceil(np.log2(len(y) + np.round(fs / f0_floor * 4) + 1)) y_spectrum = np.fft.fft(y, int(fft_size)) raw_f0_candidates = harvest_get_raw_f0_candidates( len(basic_temporal_positions), boundary_f0_list, len(y), basic_temporal_positions, actual_fs, y_spectrum, f0_floor, f0_ceil) f0_candidates, number_of_candidates = harvest_detect_official_f0_candidates(raw_f0_candidates) f0_candidates = harvest_overlap_f0_candidates(f0_candidates, number_of_candidates) f0_candidates, f0_scores = harvest_refine_candidates(y, actual_fs, basic_temporal_positions, f0_candidates, f0_floor, f0_ceil) f0_candidates, f0_scores = harvest_remove_unreliable_candidates(f0_candidates, f0_scores) connected_f0, vuv = harvest_fix_f0_contour(f0_candidates, f0_scores) smoothed_f0 = harvest_smooth_f0_contour(connected_f0) idx = np.minimum(len(smoothed_f0) - 1, np.round(temporal_positions * 1000)).astype("int32") f0 = smoothed_f0[idx] vuv = vuv[idx] f0_candidates = f0_candidates return temporal_positions, f0, vuv, f0_candidates