我们从Python开源项目中,提取了以下13个代码示例,用于说明如何使用scipy.signal.hanning()。
def plotpsd(data, dt, ndivide=1, window=hanning, overlap_half=False, ax=None, **kwargs): """Plot PSD (Power Spectral Density). Args: data (np.ndarray): Input data. dt (float): Time between each data. ndivide (int): Do averaging (split data into ndivide, get psd of each, and average them). overlap_half (bool): Split data to half-overlapped regions. ax (matplotlib.axes): Axis the figure is plotted on. kwargs (optional): Plot options passed to ax.plot(). """ if ax is None: ax = plt.gca() vk, psddata = psd(data, dt, ndivide, window, overlap_half) ax.loglog(vk, psddata, **kwargs) ax.set_xlabel('Frequency [Hz]', fontsize=20, color='grey') ax.set_ylabel('PSD', fontsize=20, color='grey') ax.legend()
def _lagged_coherence_1freq(x, f, Fs, N_cycles=3, f_step=1): """Calculate lagged coherence of x at frequency f using the hanning-taper FFT method""" Nsamp = int(np.ceil(N_cycles*Fs / f)) # For each N-cycle chunk, calculate phase chunks = _nonoverlapping_chunks(x,Nsamp) C = len(chunks) hann_window = signal.hanning(Nsamp) fourier_f = np.fft.fftfreq(Nsamp,1/float(Fs)) fourier_f_idx = _arg_closest_value(fourier_f,f) fourier_coefsoi = np.zeros(C,dtype=complex) for i2, c in enumerate(chunks): fourier_coef = np.fft.fft(c*hann_window) fourier_coefsoi[i2] = fourier_coef[fourier_f_idx] lcs_num = 0 for i2 in range(C-1): lcs_num += fourier_coefsoi[i2]*np.conj(fourier_coefsoi[i2+1]) lcs_denom = np.sqrt(np.sum(np.abs(fourier_coefsoi[:-1])**2)*np.sum(np.abs(fourier_coefsoi[1:])**2)) return np.abs(lcs_num/lcs_denom)
def ltsd_vad(x, fs, threshold=9, winsize=8192): # winsize based on sample rate # 1024 for fs = 16000 orig_dtype = x.dtype orig_scale_min = x.min() orig_scale_max = x.max() x = (x - x.min()) / (x.max() - x.min()) # works with 16 bit x = x * (2 ** 15) x = x.astype("int32") window = sp.hanning(winsize) ltsd = LTSD(winsize, window, 5) s_vad = ltsd.compute(x) # LTSD is 50% overlap, so each "step" covers 4096 samples # +1 to cover the extra edge window n_samples = int(((len(s_vad) + 1) * winsize) // 2) time_s = n_samples / float(fs) time_points = np.linspace(0, time_s, len(s_vad)) time_samples = (fs * time_points).astype(np.int32) time_samples = time_samples f_vad = np.zeros_like(x, dtype=np.bool) offset = winsize for n, (ss, es) in enumerate(zip(time_samples[:-1], time_samples[1:])): sss = ss - offset if sss < 0: sss = 0 ses = es - offset if ses < 0: ses = 0 if s_vad[n + 1] < threshold: f_vad[sss:ses] = False else: f_vad[sss:ses] = True f_vad[ses:] = False x = x.astype("float64") x = x / float(2 ** 15) x = x * (orig_scale_max - orig_scale_min) + orig_scale_min x = x.astype(orig_dtype) return x[f_vad], f_vad
def main(fn,start,end): fn = Path(fn).expanduser() #rx_array is loading the last 45% of the waveform from the file rx_array = load_bin(fn, start, end) #peak_array holds the indexes of each peak in the waveform #peak_distance is the smallest distance between each peak peak_array,peak_distance = get_peaks(rx_array) l = peak_distance-1 print('using window: ',l,'\n') #remove first peak peak_array= peak_array[1:] Npulse=len(peak_array)-1 print(Npulse,'pulses detected') wind = signal.hanning(l) Ntone = 2 Nblockest = 160 fs = 4e6 # [Hz] data = np.empty([Npulse,l]) #set each row of data to window * (first l samples after each peak) for i in range(Npulse): data[i,:] = wind * rx_array[peak_array[i]:peak_array[i]+l] fb_est, sigma = esprit(data, Ntone, Nblockest, fs) print ('fb_est',fb_est) print ('sigma: ', sigma) drange = (3e8*fb_est) / (2e6/.1) print ('range: ',drange,'\n')
def NMREval(self, xn, xnhat): """ Method to perform NMR perceptual evaluation of audio quality between two signals. Args : xn : (ndarray) 1D Array containing the true time domain signal. xnhat : (ndarray) 1D Array containing the estimated time domain signal. Returns : NMR : (float) A float measurement in dB providing a perceptually weighted evaluation. Below -9 dB can be considered as in-audible difference/error. As appears in : - K. Brandenburg and T. Sporer, “NMR and Masking Flag: Evaluation of Quality Using Perceptual Criteria,” in Proceedings of the AES 11th International Conference on Test and Measurement, Portland, USA, May 1992, pp. 169–179 - J. Nikunen and T. Virtanen, "Noise-to-mask ratio minimization by weighted non-negative matrix factorization," in Acoustics Speech and Signal Processing (ICASSP), 2010 IEEE International Conference on, Dallas, TX, 2010, pp. 25-28. """ mX, _ = TimeFrequencyDecomposition.STFT(xn, hanning(self.nfft/2 + 1), self.nfft, self.nfft/4) mXhat, _ = TimeFrequencyDecomposition.STFT(xnhat, hanning(self.nfft/2 + 1), self.nfft, self.nfft/4) # Compute Error Err = np.abs(mX - mXhat) ** 2. # Acquire Masking Threshold mT = self.maskingThreshold(mX) # Inverse the filter of masking threshold imT = 1./(mT + eps) # Outer/Middle Ear transfer function on the diagonal LTq = 10 ** (self.MOEar()/20.) # NMR computation NMR = 10. * np.log10((1./mX.shape[0]) * self._maxb * np.sum((imT * (Err*LTq)))) print(NMR) return NMR
def get_ft_windows(): """ Retrieve the available windows to be applied on signal data before FT. @return: dict with keys being the window name and items being again a dict containing the actual function and the normalization factor to calculate correctly the amplitude spectrum in the Fourier Transform To find out the amplitude normalization factor check either the scipy implementation on https://github.com/scipy/scipy/blob/v0.15.1/scipy/signal/windows.py#L336 or just perform a sum of the window (oscillating parts of the window should be averaged out and constant offset factor will remain): MM=1000000 # choose a big number print(sum(signal.hanning(MM))/MM) """ win = {'none': {'func': np.ones, 'ampl_norm': 1.0}, 'hamming': {'func': signal.hamming, 'ampl_norm': 1.0/0.54}, 'hann': {'func': signal.hann, 'ampl_norm': 1.0/0.5}, 'blackman': {'func': signal.blackman, 'ampl_norm': 1.0/0.42}, 'triang': {'func': signal.triang, 'ampl_norm': 1.0/0.5}, 'flattop': {'func': signal.flattop, 'ampl_norm': 1.0/0.2156}, 'bartlett': {'func': signal.bartlett, 'ampl_norm': 1.0/0.5}, 'parzen': {'func': signal.parzen, 'ampl_norm': 1.0/0.375}, 'bohman': {'func': signal.bohman, 'ampl_norm': 1.0/0.4052847}, 'blackmanharris': {'func': signal.blackmanharris, 'ampl_norm': 1.0/0.35875}, 'nuttall': {'func': signal.nuttall, 'ampl_norm': 1.0/0.3635819}, 'barthann': {'func': signal.barthann, 'ampl_norm': 1.0/0.5}} return win
def test_smooth(): tr = get_rand_traj() assert len(tr.attrs_nstep) > 0 trs = crys.smooth(tr, hanning(11)) assert len(trs.attrs_nstep) > 0 assert_attrs_not_none(trs, attr_lst=tr.attr_lst) for name in tr.attrs_nstep: a1 = getattr(tr, name) a2 = getattr(trs, name) assert a1.shape == a2.shape assert np.abs(a1 - a2).sum() > 0.0 assert trs.timestep == tr.timestep assert trs.nstep == tr.nstep # reproduce data with kernel [0,1,0] trs = crys.smooth(tr, hanning(3)) for name in tr.attrs_nstep: a1 = getattr(tr, name) a2 = getattr(trs, name) assert np.allclose(a1, a2) trs1 = crys.smooth(tr, hanning(3), method=1) trs2 = crys.smooth(tr, hanning(3), method=2) assert len(trs1.attrs_nstep) > 0 assert len(trs2.attrs_nstep) > 0 for name in tr.attrs_nstep: a1 = getattr(tr, name) a2 = getattr(trs1, name) a3 = getattr(trs2, name) assert np.allclose(a1, a2) assert np.allclose(a1, a3) trs1 = crys.smooth(tr, hanning(11), method=1) trs2 = crys.smooth(tr, hanning(11), method=2) assert len(trs1.attrs_nstep) > 0 assert len(trs2.attrs_nstep) > 0 for name in trs1.attrs_nstep: a1 = getattr(trs1, name) a2 = getattr(trs2, name) assert np.allclose(a1, a2)
def psd(data, dt, ndivide=1, window=hanning, overlap_half=False): """Calculate power spectrum density of data. Args: data (np.ndarray): Input data. dt (float): Time between each data. ndivide (int): Do averaging (split data into ndivide, get psd of each, and average them). ax (matplotlib.axes): Axis you want to plot on. doplot (bool): Plot how averaging works. overlap_half (bool): Split data to half-overlapped regions. Returns: vk (np.ndarray): Frequency. psd (np.ndarray): PSD """ logger = getLogger('decode.utils.ndarray.psd') if overlap_half: step = int(len(data) / (ndivide + 1)) size = step * 2 else: step = int(len(data) / ndivide) size = step if bin(len(data)).count('1') != 1: logger.warning('warning: length of data is not power of 2: {}'.format(len(data))) size = int(len(data) / ndivide) if bin(size).count('1') != 1.: if overlap_half: logger.warning('warning: ((length of data) / (ndivide+1)) * 2 is not power of 2: {}'.format(size)) else: logger.warning('warning: (length of data) / ndivide is not power of 2: {}'.format(size)) psd = np.zeros(size) T = (size - 1) * dt vs = 1 / dt vk_ = fftfreq(size, dt) vk = vk_[np.where(vk_ >= 0)] for i in range(ndivide): d = data[i * step:i * step + size] if window is None: w = np.ones(size) corr = 1.0 else: w = window(size) corr = np.mean(w**2) psd = psd + 2 * (np.abs(fft(d * w)))**2 / size * dt / corr return vk, psd[:len(vk)] / ndivide
def sinusoid_analysis(X, input_sample_rate, resample_block=128, copy=True): """ Contruct a sinusoidal model for the input signal. Parameters ---------- X : ndarray Input signal to model input_sample_rate : int The sample rate of the input signal resample_block : int, optional (default=128) Controls the step size of the sinusoidal model Returns ------- frequencies_hz : ndarray Frequencies for the sinusoids, in Hz. magnitudes : ndarray Magnitudes of sinusoids returned in ``frequencies`` References ---------- D. P. W. Ellis (2004), "Sinewave Speech Analysis/Synthesis in Matlab", Web resource, available: http://www.ee.columbia.edu/ln/labrosa/matlab/sws/ """ X = np.array(X, copy=copy) resample_to = 8000 if input_sample_rate != resample_to: if input_sample_rate % resample_to != 0: raise ValueError("Input sample rate must be a multiple of 8k!") # Should be able to use resample... ? # resampled_count = round(len(X) * resample_to / input_sample_rate) # X = sg.resample(X, resampled_count, window=sg.hanning(len(X))) X = sg.decimate(X, input_sample_rate // resample_to, zero_phase=True) step_size = 2 * round(resample_block / input_sample_rate * resample_to / 2.) a, g, e = lpc_analysis(X, order=8, window_step=step_size, window_size=2 * step_size) f, m = lpc_to_frequency(a, g) f_hz = f * resample_to / (2 * np.pi) return f_hz, m
def f_psd(x, Fs, method, Hzmed=0, welch_params={'window':'hanning','nperseg':1000,'noverlap':None}): ''' Calculate the power spectrum of a signal Parameters ---------- x : array temporal signal Fs : integer sampling rate method : str in ['fftmed','welch'] Method for calculating PSD Hzmed : float relevant if method == 'fftmed' Frequency width of the median filter welch_params : dict relevant if method == 'welch' Parameters to sp.signal.welch Returns ------- f : array frequencies corresponding to the PSD output psd : array power spectrum ''' if method == 'fftmed': # Calculate frequencies N = len(x) f = np.arange(0,Fs/2,Fs/N) # Calculate PSD rawfft = np.fft.fft(x) psd = np.abs(rawfft[:len(f)])**2 # Median filter if Hzmed > 0: sampmed = np.argmin(np.abs(f-Hzmed/2.0)) psd = signal.medfilt(psd,sampmed*2+1) elif method == 'welch': f, psd = sp.signal.welch(x, fs=Fs, **welch_params) else: raise ValueError('input for PSD method not recognized') return f, psd
def lagged_coherence(x, frange, Fs, N_cycles=3, f_step=1, return_spectrum=False): """ Quantify the rhythmicity of a time series using lagged coherence. Return the mean lagged coherence in the frequency range as an estimate of rhythmicity. As in Fransen et al. 2015 Parameters ---------- x : array-like 1d voltage time series f_range : (low, high), Hz frequency range for narrowband signal of interest, used to find zerocrossings of the oscillation Fs : float The sampling rate (default = 1000Hz) N_cycles : float Number of cycles of the frequency of interest to be used in lagged coherence calculate f_step : float, Hz step size to calculate lagged coherence in the frequency range. return_spectrum : bool if True, return the lagged coherence for all frequency values. otherwise, only return mean fourier_or_wavelet : string {'fourier', 'wavelet'} NOT IMPLEMENTED. ONLY FOURIER method for estimating phase. fourier: calculate fourier coefficients for each time window. hanning tpaer. wavelet: multiply each window by a N-cycle wavelet Returns ------- rhythmicity : float mean lagged coherence value in the frequency range of interest """ # Identify Fourier components of interest freqs = np.arange(frange[0],frange[1]+f_step,f_step) # Calculate lagged coherence for each frequency F = len(freqs) lcs = np.zeros(F) for i,f in enumerate(freqs): lcs[i] = _lagged_coherence_1freq(x, f, Fs, N_cycles=N_cycles, f_step=f_step) if return_spectrum: return lcs else: return np.mean(lcs)