我们从Python开源项目中,提取了以下30个代码示例,用于说明如何使用numpy.blackman()。
def run_mgc_example(): import matplotlib.pyplot as plt fs, x = wavfile.read("test16k.wav") pos = 3000 fftlen = 1024 win = np.blackman(fftlen) / np.sqrt(np.sum(np.blackman(fftlen) ** 2)) xw = x[pos:pos + fftlen] * win sp = 20 * np.log10(np.abs(np.fft.rfft(xw))) mgc_order = 20 mgc_alpha = 0.41 mgc_gamma = -0.35 mgc_arr = win2mgc(xw, order=mgc_order, alpha=mgc_alpha, gamma=mgc_gamma, verbose=True) xwsp = 20 * np.log10(np.abs(np.fft.rfft(xw))) sp = mgc2sp(mgc_arr, mgc_alpha, mgc_gamma, fftlen) plt.plot(xwsp) plt.plot(20. / np.log(10) * np.real(sp), "r") plt.xlim(1, len(xwsp)) plt.show()
def transform(self, X_indices, y_indices): X_indices, y_indices = super(IndexBatchIterator, self).transform(X_indices, y_indices) [count] = X_indices.shape # Use preallocated space X = self.Xbuf[:count] Y = self.Ybuf[:count] window = np.blackman(SAMPLE_SIZE//DOWNSAMPLE)[None,:] for i, ndx in enumerate(X_indices): if ndx == -1: ndx = np.random.randint(len(self.source.events)) augmented = self.augmented[:,ndx:ndx+SAMPLE_SIZE] X[i] = augmented[:,::-1][:,::DOWNSAMPLE] if y_indices is not None: Y[i] = self.source.events[ndx] Y = None if (y_indices is None) else Y return X, Y
def HeterodynWithF0Track(x, tf0, f0, sr=1, nwind=1024, nhop=512, windfunc=np.blackman): ''' Calculates the amplitude near frequency f0 in x (f0 is time-varying, values given at tf0 nwind should be at least 3 periods if the signal is periodic. ''' valid_idx = np.logical_not(np.isnan(f0)) tx = np.arange(len(x))/float(sr) f0s = np.interp(tx, tf0[valid_idx], f0[valid_idx]) phs = np.cumsum(2*np.pi*f0s/sr) sinsig = np.exp(1j*phs) hamp, t = FuncWind(np.sum, x*sinsig, power=1, sr=sr, nwind=nwind, nhop=nhop, windfunc=windfunc) return np.array(hamp)*2, np.array(t)
def SpecCentWind(x, sr=1, nwind=1024, nhop=512, windfunc=np.blackman): ''' Calculates the SpectralCentroid of x, in frames of length nwind, and in steps of nhop. windfunc is used as windowing function nwind should be at least 3 periods if the signal is periodic. ''' ff = np.arange(nwind/2)/float(nwind)*sr def SCvec(xw): xf = np.fft.fft(xw) xf2 = xf[:nwind/2] return sum(np.abs(xf2)*ff)/sum(np.abs(xf2)) amp, t = FuncWind(SCvec, x, power=0, sr=sr, nwind=nwind, nhop=nhop) return np.array(amp), np.array(t)
def design_windowed_sinc_lpf(fc, bw): N = Filter.get_filter_length_from_bandwidth(bw) # Compute sinc filter impulse response h = np.sinc(2 * fc * (np.arange(N) - (N - 1) / 2.)) # We use blackman window function w = np.blackman(N) # Multiply sinc filter with window function h = h * w # Normalize to get unity gain h_unity = h / np.sum(h) return h_unity
def iFFT(Y, output_length=None, window=False): """ Inverse real-valued Fourier Transform Parameters ---------- Y : array_like Frequency domain data [Nsignals x Nbins] output_length : int, optional Lenght of returned time-domain signal (Default: 2 x len(Y) + 1) win : boolean, optional Weights the resulting time-domain signal with a Hann Returns ------- y : array_like Reconstructed time-domain signal """ Y = _np.atleast_2d(Y) y = _np.fft.irfft(Y, n=output_length) if window: if window not in {'hann', 'hamming', 'blackman', 'kaiser'}: raise ValueError('Selected window must be one of hann, hamming, blackman or kaiser') no_of_signals, no_of_samples = y.shape if window == 'hann': window_array = _np.hanning(no_of_samples) elif window == 'hamming': window_array = _np.hamming(no_of_samples) elif window == 'blackman': window_array = _np.blackman(no_of_samples) elif window == 'kaiser': window_array = _np.kaiser(no_of_samples, 3) y = window_array * y return y
def blackman(M): """Returns the Blackman window. The Blackman window is defined as .. math:: w(n) = 0.42 - 0.5 \\cos\\left(\\frac{2\\pi{n}}{M-1}\\right) + 0.08 \\cos\\left(\\frac{4\\pi{n}}{M-1}\\right) \\qquad 0 \\leq n \\leq M-1 Args: M (:class:`~int`): Number of points in the output window. If zero or less, an empty array is returned. Returns: ~cupy.ndarray: Output ndarray. .. seealso:: :func:`numpy.blackman` """ if M < 1: return from_data.array([]) if M == 1: return basic.ones(1, float) n = ranges.arange(0, M) return 0.42 - 0.5 * trigonometric.cos(2.0 * numpy.pi * n / (M - 1))\ + 0.08 * trigonometric.cos(4.0 * numpy.pi * n / (M - 1))
def smooth(x,window_len=11,window='hanning'): if x.ndim != 1: raise ValueError, "smooth only accepts 1 dimension arrays." if x.size < window_len: return x # raise ValueError, "Input vector needs to be bigger than window size." if window_len<3: return x if not window in ['flat', 'hanning', 'hamming', 'bartlett', 'blackman']: raise ValueError, "Window is one of 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'" s=numpy.r_[x[window_len-1:0:-1],x,x[-1:-window_len:-1]] if window == 'flat': #moving average w=numpy.ones(window_len,'d') else: w=eval('numpy.'+window+'(window_len)') y=numpy.convolve(w/w.sum(),s,mode='valid') y = y[(window_len/2-1) : -(window_len/2)-1] return y
def FuncWind(func, x, sr=1, nwind=1024, nhop=512, power=1, windfunc=np.blackman): ''' Applies a function window by window to a time series ''' nsam = len(x) ist = 0 iend = ist+nwind t = [] ret = [] wind = windfunc(nwind) if power > 0: wsumpow = sum(wind**power) else: wsumpow = 1. while (iend < nsam): thisx = x[ist:iend] xw = thisx*wind ret.append(func(xw)/wsumpow) t.append(float(ist+iend)/2.0/float(sr)) ist = ist+nhop iend = ist+nwind return np.array(ret), np.array(t)
def RMSWind(x, sr=1, nwind=1024, nhop=512, windfunc=np.blackman): ''' Calculates the RMS amplitude amplitude of x, in frames of length nwind, and in steps of nhop. windfunc is used as windowing function. nwind should be at least 3 periods if the signal is periodic. ''' nsam = len(x) ist = 0 iend = ist+nwind t = [] ret = [] wind = windfunc(nwind) wsum2 = np.sum(wind**2) while (iend < nsam): thisx = x[ist:iend] xw = thisx*wind ret.append(np.sum(xw*xw/wsum2)) t.append(float(ist+iend)/2.0/float(sr)) ist = ist+nhop iend = ist+nwind return np.sqrt(np.array(ret)), np.array(t)
def Heterodyn(x, f, sr=1, nwind=1024, nhop=512, windfunc=np.blackman): ''' Calculates the amplitude near frequency f in x nwind should be at least 3 periods if the signal is periodic. ''' sinsig = np.exp(2j*np.pi*np.arange(len(x))*f/float(sr)) hamp, t = FuncWind(np.sum, x*sinsig, power=1, sr=sr, nwind=nwind, nhop=nhop, windfunc=windfunc) return np.array(hamp)*2, np.array(t)
def SpecFlux(x, sr=1, nwind=1024, nhop=512, minf=0, maxf=np.inf, windfunc=np.blackman): ''' Calculates the spectral flux in sunud ''' nsam = len(x) # first window ist = 0 iend = ist+nwind t = [] res = [] wind = windfunc(nwind) minbin = int(minf/sr*nwind) maxbinf = (float(maxf)/sr*nwind) if maxbinf > nwind: maxbin = nwind else: maxbin = int(maxbinf) while (iend < nsam-nhop): thisx = x[ist:iend] nextx = x[ist+nhop:iend+nhop] ff = np.abs(np.fft.fft(thisx*wind)) fl = np.abs(np.fft.fft(nextx*wind)) res.append(np.sqrt(sum((ff[minbin:maxbin]-fl[minbin:maxbin])**2))) t.append(float(ist+iend+nhop)/2.0/float(sr)) ist = ist+nhop iend = ist+nwind return np.array(res), np.array(t)
def _plot_multiple_spectrum(signals, fs, labels, colors): """ plot the signals spectrum """ s = Spectrum(block_length=min(2048, signals[0].size), fs=fs, wfunc=np.blackman) for sig in signals: s.periodogram(sig, hold=True) s.plot(labels=labels, colors=colors, fscale='lin')
def _design(self): """Designs the FIR filter""" # the length of the filter order = self._get_order() half_order = (order - 1) // 2 w = np.blackman(order) t = np.linspace(-half_order, half_order, order) phase = (2.0 * np.pi * self.fc / self.fs) * t car = np.cos(phase) fir = w * car # the filter must be symmetric, in order to be zero-phase assert np.all(np.abs(fir - fir[::-1]) < 1e-15) # remove the constant component by forcing fir.sum() = 0 if self.zero_mean: fir -= fir.sum() / order gain = np.sum(fir * car) self.fir = fir * (1.0 / gain) # add the imaginary part to have a complex wavelet if self.extract_complex: car_imag = np.sin(phase) fir_imag = w * car_imag self.fir_imag = fir_imag * (1.0 / gain) return self
def get_normalized_templates(template_streams): templates = [] for ts in template_streams: t = data_conversion.stream2array(ts) t = t.astype(np.float32) t -= np.mean(t, axis=1, keepdims=True) template_max = np.amax(np.abs(t)) t /= template_max t *= np.blackman(ts[0].stats.npts) templates.append(t) return templates
def local_blackman(vec): return(blackman(len(vec)))
def spectrum_process(data, sfreq, cfreq, toffset, modulus, integration, bins, log_scale, zscale, detrend, title, clr): """ Break spectrum by modulus and display each block. Integration here acts as a pure average on the spectral data. """ if detrend: dfn = matplotlib.mlab.detrend_mean else: dfn = matplotlib.mlab.detrend_none win = numpy.blackman(bins) if modulus: block = 0 block_size = integration * modulus block_toffset = toffset while block < len(data) / block_size: vblock = data[block * block_size:block * block_size + modulus] pblock, freq = matplotlib.mlab.psd( vblock, NFFT=bins, Fs=sfreq, detrend=dfn, window=win, scale_by_freq=False) # complete integration for idx in range(1, integration): vblock = data[block * block_size + idx * modulus:block * block_size + idx * modulus + modulus] pblock_n, freq = matplotlib.mlab.psd( vblock, NFFT=bins, Fs=sfreq, detrend=dfn, window=matplotlib.mlab.window_hanning, scale_by_freq=False) pblock += pblock_n pblock /= integration yield spectrum_plot(pblock, freq, cfreq, block_toffset, log_scale, zscale, title, clr) block += 1 block_toffset += block_size / sfreq else: pdata, freq = matplotlib.mlab.psd( data, NFFT=bins, Fs=sfreq, detrend=dfn, window=win, scale_by_freq=False) yield spectrum_plot(pdata, freq, cfreq, toffset, log_scale, zscale, title, clr)
def smooth(x,window_len=11,window='hanning'): """ Smooth the data using a window with requested size. Copied from http://wiki.scipy.org/Cookbook/SignalSmooth This method is based on the convolution of a scaled window with the signal. The signal is prepared by introducing reflected copies of the signal (with the window size) in both ends so that transient parts are minimized in the begining and end part of the output signal. :param x: the input signal :param window_len: the dimension of the smoothing window; should be an odd integer :param window: the type of window from 'flat', 'hanning', 'hamming', 'bartlett', 'blackman' flat window will produce a moving average smoothing. :returns: the smoothed signal Example >>> t=linspace(-2,2,0.1) >>> x=sin(t)+randn(len(t))*0.1 >>> y=smooth(x) .. seealso:: numpy.hanning, numpy.hamming, numpy.bartlett, numpy.blackman, numpy.convolve, scipy.signal.lfilter .. note:: length(output) != length(input), to correct this: return y[(window_len/2-1):-(window_len/2)] instead of just y. .. todo:: the window parameter could be the window itself if an array instead of a string """ if x.ndim != 1: raise ValueError("smooth only accepts 1 dimension arrays.") if x.size < window_len: raise ValueError("Input vector needs to be bigger than window size.") if window_len<3: return x if not window in ['flat', 'hanning', 'hamming', 'bartlett', 'blackman']: raise ValueError("Window is on of 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'") s=numpy.r_[x[window_len-1:0:-1],x,x[-1:-window_len:-1]] #print(len(s)) if window == 'flat': #moving average w=numpy.ones(window_len,'d') else: w=eval('numpy.'+window+'(window_len)') y=numpy.convolve(w/w.sum(),s,mode='valid') return y
def smooth(x,window_len=11,window='flat'): """smooth the data using a window with requested size. This method is based on the convolution of a scaled window with the signal. The signal is prepared by introducing reflected copies of the signal (with the window size) in both ends so that transient parts are minimized in the beginning and end part of the output signal. :param x: the input signal :param window_len: the dimension of the smoothing window; should be an odd integer :param window: the type of window from 'flat', 'hanning', 'hamming', 'bartlett', 'blackman' flat window will produce a moving average smoothing. :return: the smoothed signal example:: t=linspace(-2,2,0.1) x=sin(t)+randn(len(t))*0.1 y=smooth(x) :see also: numpy.hanning, numpy.hamming, numpy.bartlett, numpy.blackman, numpy.convolve, scipy.signal.lfilter TODO: the window parameter could be the window itself if an array instead of a string """ if x.ndim != 1: raise ValueError("smooth only accepts 1 dimension arrays.") if x.size < window_len: raise ValueError("Input vector needs to be bigger than window size.") if window_len < 3: return x if not window in ['flat', 'hanning', 'hamming', 'bartlett', 'blackman']: raise ValueError("Window is on of 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'") s=numpy.r_[2*x[0]-x[window_len:1:-1],x,2*x[-1]-x[-1:-window_len:-1]] #print(len(s)) if window == 'flat': #moving average w = numpy.ones(window_len,'d') else: w = eval('numpy.' + window + '(window_len)') y = numpy.convolve(w/w.sum(), s, mode='same') return y[window_len-1:-window_len+1]
def compute_moving_window_int(sample: np.ndarray, fs: float, blackman_win_length: int, filter_length: int = 257, delta: float = .02) -> np.ndarray: """ :param sample: ecg sample array :param fs: sampling frequency :param blackman_win_length: length of the blackman window on which to compute the moving window integration :param filter_length: length of the FIR bandpass filter on which filtering is done on ecg sample array :param delta: to compute the weights of each band in FIR filter :return: the Moving window integration of the sample array """ # I believe these constants can be kept in a file # filter edges filter_edges = [0, 4.5 * 2 / fs, 5 * 2 / fs, 20 * 2 / fs, 20.5 * 2 / fs, 1] # gains at filter band edges gains = [0, 0, 1, 1, 0, 0] # weights weights = [500 / delta, 1 / delta, 500 / delta] # length of the FIR filter # FIR filter coefficients for bandpass filtering filter_coeff = signal.firls(filter_length, filter_edges, gains, weights) # bandpass filtered signal bandpass_signal = signal.convolve(sample, filter_coeff, 'same') bandpass_signal /= np.percentile(bandpass_signal, 90) # derivative array derivative_array = (np.array([-1.0, -2.0, 0, 2.0, 1.0])) * (1 / 8) # derivative signal (differentiation of the bandpass) derivative_signal = signal.convolve(bandpass_signal, derivative_array, 'same') derivative_signal /= np.percentile(derivative_signal, 90) # squared derivative signal derivative_squared_signal = derivative_signal ** 2 derivative_squared_signal /= np.percentile(derivative_squared_signal, 90) # blackman window blackman_window = np.blackman(blackman_win_length) # moving window Integration of squared derivative signal mov_win_int_signal = signal.convolve(derivative_squared_signal, blackman_window, 'same') mov_win_int_signal /= np.percentile(mov_win_int_signal, 90) return mov_win_int_signal
def detect_rpeak(ecg: DataStream, fs: float = 64, threshold: float = 0.5, blackman_win_len_range: float = 0.2) -> DataStream: """ This program implements the Pan Tomkins algorithm on ECG signal to detect the R peaks Since the ecg array can have discontinuity in the timestamp arrays the rr-interval calculated in the algorithm is calculated in terms of the index in the sample array The algorithm consists of some major steps 1. computation of the moving window integration of the signal in terms of blackman window of a prescribed length 2. compute all the peaks of the moving window integration signal 3. adaptive thresholding with dynamic signal and noise thresholds applied to filter out the R peak locations 4. confirm the R peaks through differentiation from the nearby peaks and remove the false peaks :param ecg: ecg array of tuples (timestamp,value) :param fs: sampling frequency :param threshold: initial threshold to detect the R peak in a signal normalized by the 90th percentile. .5 is default. :param blackman_win_len_range : the range to calculate blackman window length :return: R peak array of tuples (timestamp, Rpeak interval) """ data = ecg.data result = DataStream.from_datastream([ecg]) if len(data) == 0: result.data = [] return result sample = np.array([i.sample for i in data]) timestamp = np.array([i.start_time for i in data]) # computes the moving window integration of the signal blackman_win_len = np.ceil(fs * blackman_win_len_range) y = compute_moving_window_int(sample, fs, blackman_win_len) peak_location_values = [(i, y[i]) for i in range(2, len(y) - 1) if check_peak(y[i - 2:i + 3])] # initial RR interval average peak_location = [i[0] for i in peak_location_values] running_rr_avg = sum(np.diff(peak_location)) / (len(peak_location) - 1) rpeak_temp1 = compute_r_peaks(threshold, running_rr_avg, y, peak_location_values) rpeak_temp2 = remove_close_peaks(rpeak_temp1, sample, fs) index = confirm_peaks(rpeak_temp2, sample, fs) rpeak_timestamp = timestamp[index] rpeak_value = np.diff(rpeak_timestamp) rpeak_timestamp = rpeak_timestamp[1:] result_data = [] for k in range(len(rpeak_value)): result_data.append( DataPoint.from_tuple(rpeak_timestamp[k], rpeak_value[k].seconds + rpeak_value[k].microseconds / 1e6)) # Create resulting datastream to be returned result.data = result_data return result
def design(self, fs, fc, n_cycles=7.0, bandwidth=None, zero_mean=True): """ Designs a FIR filter that is a band-pass filter centered on frequency fc. fs : sampling frequency (Hz) fc : frequency of the carrier (Hz) n_cycles : number of oscillation in the wavelet bandwidth : bandwidth of the FIR wavelet filter (used when: bandwidth is not None and n_cycles is None) zero_mean : if True, we make sure fir.sum() = 0 """ # the length of the filter if bandwidth is None and n_cycles is not None: half_order = int(float(n_cycles) / fc * fs / 2) order = 2 * half_order + 1 elif bandwidth is not None and n_cycles is None: half_order = int(1.65 * fs / bandwidth) // 2 order = half_order * 2 + 1 else: raise ValueError('Carrier.design: n_cycles and order cannot be ' 'both None or both not None.') w = np.blackman(order) t = np.linspace(-half_order, half_order, order) phase = (2.0 * np.pi * fc / fs) * t car = np.cos(phase) fir = w * car # the filter must be symmetric, in order to be zero-phase assert np.all(np.abs(fir - fir[::-1]) < 1e-15) # remove the constant component by forcing fir.sum() = 0 if zero_mean: fir -= fir.sum() / order gain = np.sum(fir * car) self.fir = fir * (1.0 / gain) self.fs = fs # add the imaginary part to have a complex wavelet if self.extract_complex: car_imag = np.sin(phase) fir_imag = w * car_imag self.fir_imag = fir_imag * (1.0 / gain) return self