我们从Python开源项目中,提取了以下7个代码示例,用于说明如何使用scipy.signal.welch()。
def getPSD( df , dw = 0.05, roverlap = 0.5, window='hanning', detrend='constant') : """ Compute the power spectral density """ if type(df) == pd.Series : df = pd.DataFrame(df) nfft = int ( (2*pi / dw) / dx(df) ) nperseg = 2**int(log(nfft)/log(2)) noverlap = nperseg * roverlap """ Return the PSD of a time signal """ try : from scipy.signal import welch except : raise Exception("Welch function not found, please install scipy > 0.12") data = [] for iSig in range(df.shape[1]) : test = welch( df.values[:,iSig] , fs = 1. / dx(df) , window=window, nperseg=nperseg, noverlap=noverlap, nfft=nfft, detrend=detrend, return_onesided=True, scaling='density') data.append( test[1] / (2*pi) ) xAxis = test[0][:] * 2*pi return pd.DataFrame( data = np.transpose(data), index = xAxis , columns = [ "psd("+ str(x) +")" for x in df.columns ] )
def compute_power_spectral_density(self, windowed_signal): # Windowed signal of shape [channel][sample] [16 x 12000] ret = [] # Welch parameters sliding_window = 512 overlap = 0.25 n_overlap = int(sliding_window * overlap) # compute psd using Welch method freqs, power = signal.welch(windowed_signal, fs=SAMPLING_FREQUENCY, nperseg=sliding_window, noverlap=n_overlap) for psd_freq in PSD_FREQ: tmp = (freqs >= psd_freq[0]) & (freqs < psd_freq[1]) ret.append(power[:,tmp].mean(1)) return(np.log(np.array(ret) / np.sum(ret, axis=0)))
def get(self, x, **kwargs): """ """ # Check size of x: if len(x.shape) == 1: x = x.reshape(1, len(x), 1) elif len(x.shape) == 2: x = x[np.newaxis, ...] self._nelec, self._npts, self._ntrials = x.shape # Split x in window: if self._window is not None: x = np.transpose( np.array([x[:, k[0]:k[1], :] for k in self._window]), ( 1, 2, 0, 3)) # Compute PSD: return welch(x, fs=self._sf, axis=1, **kwargs)
def getPSD(self, ens): """ Calculate and return average power spectral density for ensemble `ens` using Welch's method """ print('in getpsd, ens=', ens, self) if not self.segyfile: raise Exception("File not opened") enstrcs = [t for t in self.segyfile.traces if t.header.original_field_record_number == ens] psds = [sig.welch(t.data, fs=1/self.dt, scaling='spectrum') for t in enstrcs] # psds is [(f,psd), (f,psd)...] freqs = psds[0][0] psdonly = [p for (f, p) in psds] psdonly = np.array(psdonly).transpose() psdavg = np.mean(psdonly, 1) print(freqs.shape, psdavg.shape) return(freqs, psdavg)
def plot_psd(X, label, Fs, NFFT, color=None): freqs, psd = welch(X, fs=Fs, window='hanning', nperseg=NFFT, noverlap=int(NFFT * 0.8)) freqs = freqs[freqs > 0] psd = psd[freqs > 0] plt.plot(np.log10(freqs), 10 * np.log10(psd.ravel()), label=label, color=color) ############################################################################### # Now we read in the data # # Then we plot the power spectrum of the MEG and reference channels, # apply the reference correction and add the resulting cleaned MEG channels # to our comparison.
def cwplot(fb_est,rx,t,fs:int,fn) -> None: #%% time fg,axs = subplots(1,2,figsize=(12,6)) ax = axs[0] ax.plot(t, rx.T.real) ax.set_xlabel('time [sec]') ax.set_ylabel('amplitude') ax.set_title('Noisy, jammed receive signal') #%% periodogram if DTPG >= (t[-1]-t[0]): dt = (t[-1]-t[0])/4 else: dt = DTPG dtw = 2*dt # seconds to window tstep = ceil(dt*fs) wind = ceil(dtw*fs); Nfft = zeropadfactor*wind f,Sraw = signal.welch(rx.ravel(), fs, nperseg=wind,noverlap=tstep,nfft=Nfft, return_onesided=False) if np.iscomplex(rx).any(): f = np.fft.fftshift(f); Sraw = np.fft.fftshift(Sraw) ax=axs[1] ax.plot(f,Sraw,'r',label='raw signal') fc_est = f[Sraw.argmax()] #ax.set_yscale('log') ax.set_xlim([fc_est-200,fc_est+200]) ax.set_xlabel('frequency [Hz]') ax.set_ylabel('amplitude') ax.legend() esttxt='' if fn is None: # simulation ax.axvline(ft+fb0,color='red',linestyle='--',label='true freq.') esttxt += f'true: {ft+fb0} Hz ' for e in fb_est: ax.axvline(e,color='blue',linestyle='--',label='est. freq.') esttxt += ' est: ' + str(fb_est) +' Hz' ax.set_title(esttxt)
def _psd_welch(x, sfreq, fmin=0, fmax=np.inf, n_fft=256, n_overlap=0, n_jobs=1): """Compute power spectral density (PSD) using Welch's method. x : array, shape=(..., n_times) The data to compute PSD from. sfreq : float The sampling frequency. fmin : float The lower frequency of interest. fmax : float The upper frequency of interest. n_fft : int The length of the tapers ie. the windows. The smaller it is the smoother are the PSDs. The default value is 256. If ``n_fft > len(inst.times)``, it will be adjusted down to ``len(inst.times)``. n_overlap : int The number of points of overlap between blocks. Will be adjusted to be <= n_fft. The default value is 0. n_jobs : int Number of CPUs to use in the computation. Returns ------- psds : ndarray, shape (..., n_freqs) or The power spectral densities. All dimensions up to the last will be the same as input. freqs : ndarray, shape (n_freqs,) The frequencies. """ from scipy.signal import welch dshape = x.shape[:-1] n_times = x.shape[-1] x = x.reshape(-1, n_times) # Prep the PSD n_fft, n_overlap = _check_nfft(n_times, n_fft, n_overlap) win_size = n_fft / float(sfreq) logger.info("Effective window size : %0.3f (s)" % win_size) freqs = np.arange(n_fft // 2 + 1, dtype=float) * (sfreq / n_fft) freq_mask = (freqs >= fmin) & (freqs <= fmax) freqs = freqs[freq_mask] # Parallelize across first N-1 dimensions parallel, my_pwelch, n_jobs = parallel_func(_pwelch, n_jobs=n_jobs) x_splits = np.array_split(x, n_jobs) f_psd = parallel(my_pwelch(d, noverlap=n_overlap, nfft=n_fft, fs=sfreq, freq_mask=freq_mask, welch_fun=welch) for d in x_splits) # Combining/reshaping to original data shape psds = np.concatenate(f_psd, axis=0) psds = psds.reshape(np.hstack([dshape, -1])) return psds, freqs