我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用scipy.signal.filtfilt()。
def homomorphic_envelope(x, fs=1000, f_LPF=8, order=3): """ Computes the homomorphic envelope of x Args: x : array fs : float Sampling frequency. Defaults to 1000 Hz f_LPF : float Lowpass frequency, has to be f_LPF < fs/2. Defaults to 8 Hz Returns: time : numpy array """ b, a = butter(order, 2 * f_LPF / fs, 'low') he = np.exp(filtfilt(b, a, np.log(np.abs(hilbert(x))))) return he
def highpass(data, BUTTER_ORDER=3, sampling_rate=10000, cut_off=500.0): Wn = (float(cut_off) / (float(sampling_rate) / 2.0), 0.95) b, a = signal.butter(BUTTER_ORDER, Wn, 'pass') return signal.filtfilt(b, a, data)
def lpf(x, cutoff, fs, order=5): """ low pass filters signal with Butterworth digital filter according to cutoff frequency filter uses Gustafsson’s method to make sure forward-backward filt == backward-forward filt Note that edge effects are expected Args: x (array): signal data (numpy array) cutoff (float): cutoff frequency (Hz) fs (int): sample rate (Hz) order (int): order of filter (default 5) Returns: filtered (array): low pass filtered data """ nyquist = fs / 2 b, a = butter(order, cutoff / nyquist) if not np.all(np.abs(np.roots(a)) < 1): raise PsolaError('Filter with cutoff at {} Hz is unstable given ' 'sample frequency {} Hz'.format(cutoff, fs)) filtered = filtfilt(b, a, x, method='gust') return filtered
def pb2bb(x, fs, fc, fd=None, flen=127, cutoff=None): """Convert passband signal to baseband. The baseband conversion uses a low-pass filter after downconversion, with a default cutoff frequency of `0.6*fd`, if `fd` is specified, or `1.1*fc` if `fd` is not specified. Alternatively, the user may specify the cutoff frequency explicitly. For communication applications, one may wish to use :func:`arlpy.comms.downconvert` instead, as that function supports matched filtering with a pulse shape rather than a generic low-pass filter. :param x: passband signal :param fs: sampling rate of passband signal in Hz :param fc: carrier frequency in passband in Hz :param fd: sampling rate of baseband signal in Hz (``None`` => same as `fs`) :param flen: number of taps in the low-pass FIR filter :param cutoff: cutoff frequency in Hz (``None`` means auto-select) :returns: complex baseband signal, sampled at `fd` """ if cutoff is None: cutoff = 0.6*fd if fd is not None else 1.1*fc y = x * _np.sqrt(2)*_np.exp(2j*_np.pi*fc*time(x,fs)) hb = _sig.firwin(flen, cutoff=cutoff, nyq=fs/2.0) y = _sig.filtfilt(hb, 1, y) if fd is not None and fd != fs: y = _sig.resample_poly(y, 2*fd, fs)[::2] return y
def process(self, obj_data): ''' Apply lowpass filter to data set, with changes applied in place @param obj_data: Input data with data ''' ntaps = self.ap_paramList[0]() fpassf_per = self.ap_paramList[1]() fstopf_per = self.ap_paramList[2]() wghts = self.ap_paramList[3]() miter = self.ap_paramList[4]() b_filt=signal.fir_filter_design.remez(numtaps=ntaps, bands=[0,fpassf_per,fstopf_per,.5], desired=[1,0],weight=wghts,maxiter=miter) for label, data, err in obj_data.getIterator(): data.update(signal.filtfilt(b_filt,1,data))
def process(self, obj_data): ''' Apply lowpass filter to data set @param obj_data: Input data. Changes are made in place. ''' column_names = obj_data.getDefaultColumns() ntaps = self.ap_paramList[0]() fpassf_per = self.ap_paramList[1]() fstopf_per = self.ap_paramList[2]() wghts = self.ap_paramList[3]() miter = self.ap_paramList[4]() b_filt=signal.fir_filter_design.remez(numtaps=ntaps, bands=[0,fpassf_per,fstopf_per,.5], desired=[1,0],weight=wghts,maxiter=miter) for label, data in obj_data.getIterator(): for column in column_names: obj_data.updateData(label, data.index, column, signal.filtfilt(b_filt,1,data))
def process(self, X): onset_func = zscore(self.func(X)) onset_func = signal.filtfilt(self.moving_avg_filter, 1, onset_func) onset_func = onset_func - signal.medfilt( onset_func[:,np.newaxis], self.median_kernel )[:,0] peaks = signal.argrelmax(onset_func) onsets = peaks[0][np.where(onset_func[peaks[0]] > self.threshold)] return onsets
def bfilt(cop_dat, cutoff, order): # Butterworth filter b,a = signal.butter(order, cutoff) # filter coronal cor_lp = signal.filtfilt(b, a, cop_dat[:,0]) # filter sagittal sag_lp = signal.filtfilt(b, a, cop_dat[:,1]) return np.concatenate((to_sing(cor_lp),to_sing(sag_lp),to_sing(cop_dat[:,2])),axis=1)
def _analog_filter(self, ftype, highpass=None, lowpass=None, order=3, zerophase=True): ' Use a classic analog filter on the data, currently butter or bessel' from scipy.signal import butter, bessel filter_types = {'butter': butter, 'bessel': bessel} afilter = filter_types[ftype] if highpass is None and lowpass is not None: b, a = afilter(order, lowpass / (self.sr / 2), btype='lowpass') elif highpass is not None and lowpass is None: b, a = afilter(order, highpass / (self.sr / 2), btype='highpass') elif highpass is not None and lowpass is not None: if highpass < lowpass: b, a = afilter(order, (highpass / (self.sr / 2), lowpass / (self.sr / 2)), btype='bandpass') else: b, a = afilter(order, (lowpass / (self.sr / 2), highpass / (self.sr / 2)), btype='bandstop') if zerophase: return self.filtfilt(b, a) else: return self.lfilter(b, a)
def filtfilt(self, b, a): " Performs forward backward filtering on the stream." from scipy.signal import filtfilt def filter_func(x): return filtfilt(b, a, x, axis=0) return self.new_stream(self.vector_map(filter_func))
def abs_and_smooth(x, sr, lp=100): abs_x = np.abs(x) if len(x.shape) > 1: abs_x = np.sum(abs_x, axis=-1) # sum over last dimension eg sum over channels b, a = butter(3, lp / 2 / sr, btype="low") filtered_x = filtfilt(b, a, abs_x, axis=0) return filtered_x
def test_filtfilt(): sr = 1000 freq = 100 b, a = butter(3, freq / (sr / 2), 'high') for data in (data2, data3, data4): x = filtfilt(b, a, data, axis=0) y = Stream(data, chunksize=211, sr=sr).filtfilt(b, a).call() assert eq(x, y)
def butter_lowpass_filter(data, cut, fs, order, zero_phase=False): from scipy.signal import butter, lfilter, filtfilt nyq = 0.5 * fs cut = cut / nyq b, a = butter(order, cut, btype='low') y = (filtfilt if zero_phase else lfilter)(b, a, data) return y
def apply_butter_filter(self, data, order, cutoff, btype): cutoff /= self.nyq b, a = butter(N=order, Wn=cutoff, btype=btype) return filtfilt(b, a, data, axis=1)
def _getFiltDesign(sf, f, npts, filtname, cycle, order, axis): """Get the designed filter sf : sample frequency f : frequency vector/list [ex : f = [2,4]] npts : number of points - 'fir1' - 'butter' - 'bessel' """ if type(f) != np.ndarray: f = np.array(f) # fir1 filter : if filtname == 'fir1': fOrder = fir_order(sf, npts, f[0], cycle=cycle) b, a = fir1(fOrder, f/(sf / 2)) # butterworth filter : elif filtname == 'butter': b, a = butter(order, [(2*f[0])/sf, (2*f[1])/sf], btype='bandpass') fOrder = None # bessel filter : elif filtname == 'bessel': b, a = bessel(order, [(2*f[0])/sf, (2*f[1])/sf], btype='bandpass') fOrder = None def filtSignal(x): return filtfilt(b, a, x, padlen=fOrder, axis=axis) return filtSignal
def fir_filt(x, Fs, Fc, fOrder): (b, a) = fir1(fOrder, Fc / (Fs / 2)) return filtfilt(b, a, x, padlen=fOrder) #################################################################### # - Morlet : ####################################################################
def butter_lowpass_filtfilt(data, *, cutoff, fs, order=5): """Lowpass filter data using a zero-phase filt-filt butterworth filter. Performs zero-phase digital filtering by processing the input data in both the forward and reverse directions. """ b, a = butter_lowpass(cutoff, fs, order=order) y = filtfilt(b, a, data, padlen=150) return y
def filtfiltlong(finname, foutname, fmt, b, a, buffer_len=100000, overlap_len=100, max_len=-1): """Use memmap and chunking to filter continuous data. Inputs: finname - foutname - fmt - data format eg 'i' b,a - filter coefficients buffer_len - how much data to process at a time overlap_len - how much data do we add to the end of each chunk to smooth out filter transients max_len - how many samples to process. If set to -1, processes the whole file Outputs: y - The memmapped array pointing to the written file Notes on algorithm: 1. The arrays are memmapped, so we let pylab (numpy) take care of handling large arrays 2. The filtering is done in chunks: Chunking details: |<------- b1 ------->||<------- b2 ------->| -----[------*--------------{-----*------]--------------*------}---------- |<-------------- c1 -------------->| |<-------------- c2 -------------->| From the array of data we cut out contiguous buffers (b1,b2,...) and to each buffer we add some extra overlap to make chunks (c1,c2). The overlap helps to remove the transients from the filtering which would otherwise appear at each buffer boundary. """ x = pylab.memmap(finname, dtype=fmt, mode='r') if max_len == -1: max_len = x.size y = pylab.memmap(foutname, dtype=fmt, mode='w+', shape=max_len) for buff_st_idx in xrange(0, max_len, buffer_len): chk_st_idx = max(0, buff_st_idx - overlap_len) buff_nd_idx = min(max_len, buff_st_idx + buffer_len) chk_nd_idx = min(x.size, buff_nd_idx + overlap_len) rel_st_idx = buff_st_idx - chk_st_idx rel_nd_idx = buff_nd_idx - chk_st_idx this_y_chk = filtfilt(b, a, x[chk_st_idx:chk_nd_idx]) y[buff_st_idx:buff_nd_idx] = this_y_chk[rel_st_idx:rel_nd_idx] return y
def periodic_derivative(x,y,max_periods): plot = False Ns =len(x) b,a = signalbutter(8,2.0*max_periods/Ns) ymid =interp(x+0.5*(x[1]-x[0]),x,y,period=2*np_pi) yder = diff(ymid)/diff(x) #yder = Ns/(max(x)-min(x))*fftpackdiff(y,1,Ns) yder_filt = deepcopy(yder) x_filt = deepcopy(x) x_filt = npappend(x_filt,x_filt[-1]+x_filt[1]-x_filt[0]) yder_filt = signalfiltfilt(b,a,npappend(yder_filt,yder_filt[0])) if plot: plt.figure(1) plt.subplot(311) plt.plot(x, y) plt.subplot(312) plt.plot(x[0:-1],yder) plt.subplot(313) plt.plot(x_filt[0:-1],yder_filt) plt.show() return yder_filt #x = numpy.array(range(100)) #y = numpy.sin(twopi*x/100)+numpy.sin(twopi*x/10) #periodic_derivative(x,y,4)
def filter(self, s, axis=0): """Filter new data. """ if self.bandType == 'allpass': return s if self.bandType == 'allstop': return np.zeros_like(s) """ Should be very close to filtfilt, padding? XXX - idfah if self.zeroPhase: rev = [slice(None),]*s.ndim rev[axis] = slice(None,None,-1) #ziScaled = self.scaleZi(s[rev], axis) y, newZi = spsig.lfilter(self.numCoef, self.denomCoef, s[rev], axis=axis, zi=newZi) y = y[rev] """ # if zeroPhase and signal is shorter than padlen (default in filtfilt function) if self.zeroPhase and \ (3*max(len(self.numCoef), len(self.denomCoef))) < s.shape[axis]: # need astype below since filtfilt calls lfilter_zi, which does not preserve dtype XXX - idfah return spsig.filtfilt(self.numCoef, self.denomCoef, s, axis=axis, padtype='even').astype(self.dtype, copy=False) else: ziScaled = self.scaleZi(s, axis) # even padding to help reduce edge effects nPad = 3*max(len(self.numCoef), len(self.denomCoef)) sPad = np.apply_along_axis(np.pad, axis, s, pad_width=nPad, mode='reflect') # edge for constant padding slc = [slice(nPad,-nPad) if i == axis else slice(None) for i in range(s.ndim)] y, newZi = spsig.lfilter(self.numCoef, self.denomCoef, sPad, axis=axis, zi=ziScaled) return y[slc]
def high_pass(self,data_buffer): [b1, a1] = [self.high_pass_coefficients[0],self.high_pass_coefficients[1]] high_passed = signal.filtfilt(b1,a1,data_buffer) return high_passed
def low_pass(self,data_buffer): [b, a] = [self.low_pass_coefficients[0],self.low_pass_coefficients[1]] low_passed = signal.filtfilt(b,a,data_buffer) return low_passed
def low_pass_filter(x, order, cutOffFrequency, samplingFrequency): nyq = samplingFrequency*0.5 cut = cutOffFrequency/nyq b, a = signal.butter(order, cut, btype='low') y = signal.filtfilt(b, a, x) return y
def imu_filter_lowpass(x, order = 4, sRate = 148.148148148148, highcut = 20.0): """ Forward-backward band-pass filtering (IIR butterworth filter) """ nyq = 0.5 * sRate high = highcut/nyq b, a = butter(N =order, Wn = high, btype = 'low') return filtfilt(b=b, a=a, x=x, axis=0, method = 'pad', padtype = 'odd', padlen = np.minimum(3*len(a)*len(b), x.shape[0]-1))
def imu_filter_highpass(x, order = 4, sRate = 148.148148148148, lowcut = 0.01): """ Forward-backward band-pass filtering (IIR butterworth filter) """ nyq = 0.5 * sRate low = lowcut/nyq b, a = butter(N =order, Wn = low, btype = 'high') return filtfilt(b=b, a=a, x=x, axis=0, method = 'pad', padtype = 'odd', padlen = np.minimum(3*len(a)*len(b), x.shape[0]-1))
def imu_filter_bandpass(x, order = 4, sRate = 148.148148148148, lowcut = 1., highcut = 20.): """ Forward-backward band-pass filtering (IIR butterworth filter) """ nyq = 0.5 * sRate low = lowcut/nyq high = highcut/nyq b, a = butter(N =order, Wn = [low, high], btype = 'band') return filtfilt(b=b, a=a, x=x, axis=0, method = 'pad', padtype = 'odd', padlen = np.minimum(3*len(a)*len(b), x.shape[0]-1))
def imu_filter_comb(x): """ Comb filtering at 50 Hz. Coefficients are computed with MATLAB.""" b = np.zeros(41) a = np.zeros(41) b[0] = 0.941160767899653 b[-1] = -0.941160767899653 a[0] = 1. a[-1] = -0.882321535799305 return filtfilt(b=b, a=a, x=x, axis=0, method = 'pad', padtype = 'odd', padlen = np.minimum(3*len(a)*len(b), x.shape[0]-1))
def glove_filter(self, order = 4, highcut = 2): nyq = 0.5 * self.sRate['glove'] high = highcut/nyq b, a = butter(N=order, Wn = high, btype = 'lowpass') self.glove = filtfilt(b=b, a=a, x=self.glove, axis=0)
def emg_filter_comb(x): """ Comb filtering at 50 Hz, for 2KHz sampling frequency. Coefficients are computed with MATLAB.""" b = np.zeros(41) a = np.zeros(41) b[0] = 0.941160767899653 b[-1] = -0.941160767899653 a[0] = 1. a[-1] = -0.882321535799305 return filtfilt(b=b, a=a, x=x, axis=0, method = 'pad', padtype = 'odd', padlen = np.minimum(3*np.maximum(len(a),len(b)), x.shape[0]-1))
def glove_filter_lowpass(x, order = 4, sRate = 2000., highcut = 1.): """ Forward-backward band-pass filtering (IIR butterworth filter) """ nyq = 0.5 * sRate high = highcut/nyq b, a = butter(order, high, btype = 'low') filtered = filtfilt(b=b, a=a, x=x, axis=0, method = 'pad', padtype = 'odd') return filtered
def generate_noise(D,N): """Generate data for the changepoint detection. Data can either be of type 0 or type 1, but when it's a combination fo both, we define a target label Input - D,N Dimenstionality arguments D dimensions over N samples Output - Data in format X is a matrix in R^{N x D} y is a matrix in R^{N,} not to donfuse with {N,1}""" #Check if we have even D, so we can split the array in future assert D%2 == 0, 'We need even number of dimensions' Dhalf = int(D/2) ratioP = 0.5 #balance of targets X = np.random.randn(N,D) y = np.zeros(N) #Generate two filter cofficients filters = {} filters['b1'],filters['a1'] = signal.butter(4,2.0*cutoff1/fs,btype='lowpass') filters['b0'],filters['a0'] = signal.butter(4,2.0*cutoff0/fs,btype='lowpass') for i in xrange(N): if np.random.rand() > 0.5: #Half of the samples will have changepoint, other half wont signalA = signal.filtfilt(filters['b1'],filters['a1'],X[i]) signalB = signal.filtfilt(filters['b0'],filters['a0'],X[i]) X[i] = np.concatenate((signalA[:Dhalf],signalB[:Dhalf]),axis=0) #Concatenate the two signals if True: #Boolean: do you want to introduce a pattern at the changepoint? Dstart = int(Dhalf-pattern_len/2) X[i,Dstart:Dstart+pattern_len] = pattern y[i] = 1 #The target label else: mode = int(np.random.rand()>ratioP) X[i] = signal.filtfilt(filters['b'+str(mode)],filters['a'+str(mode)],X[i]) y[i] = 0 #The target label return X,y
def generate_noise(D,N): """Generate data for the changepoint detection. Data can either be of type 0 or type 1, but when it's a combination fo both, we define a target label Input - D,N Dimenstionality arguments D dimensions over N samples Output - Data in format X is a matrix in R^{N x D} y is a matrix in R^{N,} not to donfuse with {N,1}""" #Check if we have even D, so we can split the array in future assert D%2 == 0, 'We need even number of dimensions' Dhalf = int(D/2) ratioP = 0.5 #balance of targets X = np.random.randn(N,D) y = np.zeros(N) mark = np.zeros(N) #Generate two filter cofficients filters = {} filters['b1'],filters['a1'] = signal.butter(4,2.0*cutoff1/fs,btype='lowpass') filters['b0'],filters['a0'] = signal.butter(4,2.0*cutoff0/fs,btype='lowpass') for i in xrange(N): if np.random.rand() > 0.5: #Half of the samples will have changepoint, other half wont Dcut = np.random.randint(pattern_len,D-pattern_len) signalA = signal.filtfilt(filters['b1'],filters['a1'],X[i]) signalB = signal.filtfilt(filters['b0'],filters['a0'],X[i]) X[i] = np.concatenate((signalA[:Dcut],signalB[Dcut:]),axis=0) #Concatenate the two signals if True: #Boolean: do you want to introduce a pattern at the changepoint? Dstart = int(Dcut - pattern_len/2) X[i,Dstart:Dstart+pattern_len] = pattern y[i] = 1 #The target label mark[i] = Dcut else: mode = int(np.random.rand()>ratioP) X[i] = signal.filtfilt(filters['b'+str(mode)],filters['a'+str(mode)],X[i]) y[i] = 0 #The target label return X,y,mark
def generate_noise(D,N): """Generate data for the changepoint detection. Data can either be of type 0 or type 1, but when it's a combination fo both, we define a target label Input - D,N Dimenstionality arguments D dimensions over N samples Output - Data in format X is a matrix in R^{N x D} y is a matrix in R^{N,} not to donfuse with {N,1}""" #Check if we have even D, so we can split the array in future assert D%2 == 0, 'We need even number of dimensions' ratioP = 0.5 #balance of targets X = np.random.randn(N,D) y = np.zeros(N) mark = np.zeros(N) #Generate two filter cofficients filters = {} filters['b1'],filters['a1'] = signal.butter(4,2.0*cutoff1/fs,btype='lowpass') filters['b0'],filters['a0'] = signal.butter(4,2.0*cutoff0/fs,btype='lowpass') for i in xrange(N): if np.random.rand() > 0.5: #Half of the samples will have changepoint, other half wont Dcut = np.random.randint(pattern_len,D-pattern_len) signalA = signal.filtfilt(filters['b1'],filters['a1'],X[i]) signalB = signal.filtfilt(filters['b0'],filters['a0'],X[i]) X[i] = np.concatenate((signalA[:Dcut],signalB[Dcut:]),axis=0) #Concatenate the two signals if True: #Boolean: do you want to introduce a pattern at the changepoint? Dstart = int(Dcut - pattern_len/2) X[i,Dstart:Dstart+pattern_len] = pattern y[i] = 1 #The target label mark[i] = Dcut else: mode = int(np.random.rand()>ratioP) X[i] = signal.filtfilt(filters['b'+str(mode)],filters['a'+str(mode)],X[i]) y[i] = 0 #The target label return X,y,mark
def _decimate(x, q): """ Downsample the signal after low-pass filtering to avoid aliasing. An order 16 Chebyshev type I filter is used. Parameters ---------- x : ndarray The signal to be downsampled, as an N-dimensional array. q : int The downsampling factor. Returns ------- y : ndarray The down-sampled signal. """ if not isinstance(q, int): raise TypeError("q must be an integer") b, a = signal.filter_design.cheby1(16, 0.025, 0.98 / q) y = signal.filtfilt(b, a, x, axis=-1) sl = [slice(None)] * y.ndim sl[-1] = slice(None, None, q) return y[sl]
def _update_raw_data(params): """Helper only needs to be called when time or proj is changed""" from scipy.signal import filtfilt start = params['t_start'] stop = params['raw'].time_as_index(start + params['duration'])[0] start = params['raw'].time_as_index(start)[0] data_picks = _pick_data_channels(params['raw'].info) data, times = params['raw'][:, start:stop] if params['projector'] is not None: data = np.dot(params['projector'], data) # remove DC if params['remove_dc'] is True: data -= np.mean(data, axis=1)[:, np.newaxis] if params['ba'] is not None: data[data_picks] = filtfilt(params['ba'][0], params['ba'][1], data[data_picks], axis=1, padlen=0) # scale for di in range(data.shape[0]): data[di] /= params['scalings'][params['types'][di]] # stim channels should be hard limited if params['types'][di] == 'stim': norm = float(max(data[di])) data[di] /= norm if norm > 0 else 1. # clip if params['clipping'] == 'transparent': data[np.logical_or(data > 1, data < -1)] = np.nan elif params['clipping'] == 'clamp': data = np.clip(data, -1, 1, data) params['data'] = data params['times'] = times
def _filtfilt(*args, **kwargs): """wrap filtfilt, excluding padding arguments""" from scipy.signal import filtfilt # cut out filter args if len(args) > 4: args = args[:4] if 'padlen' in kwargs: del kwargs['padlen'] return filtfilt(*args, **kwargs)
def get_filtfilt(): """Helper to get filtfilt from scipy""" from scipy.signal import filtfilt if 'padlen' in _get_args(filtfilt): return filtfilt return _filtfilt
def apply_filter(x, filter=None, axis=0): """Apply a filter to an array.""" if isinstance(x, list): x = np.asarray(x) if x.shape[axis] == 0: return x b, a = filter return signal.filtfilt(b, a, x[:], axis=axis)
def bandfilter(data,fa=None,fb=None,Fs=1000.,order=4,zerophase=True,bandstop=False): N = len(data) assert len(shape(data))==1 padded = zeros(2*N,dtype=data.dtype) padded[N/2:N/2+N]=data padded[:N/2]=data[N/2:0:-1] padded[N/2+N:]=data[-1:N/2-1:-1] if not fa==None and not fb==None: if bandstop: b,a = butter(order,array([fa,fb])/(0.5*Fs),btype='bandstop') else: b,a = butter(order,array([fa,fb])/(0.5*Fs),btype='bandpass') elif not fa==None: # high pass b,a = butter(order,fa/(0.5*Fs),btype='high') assert not bandstop elif not fb==None: # low pass b,a = butter(order,fb/(0.5*Fs),btype='low') assert not bandstop else: assert 0 if zerophase: return filtfilt(b,a,padded)[N/2:N/2+N] else: return lfilter(b,a,padded)[N/2:N/2+N] assert 0 # now try it with a broad spectrum
def bandpass_filter(data,fa=None,fb=None, Fs=1000.,order=4,zerophase=True,bandstop=False): ''' IF fa is None, assumes lowpass with cutoff fb IF fb is None, assume highpass with cutoff fa Array can be any dimension, filtering performed over last dimension Args: data (ndarray): data, filtering performed over last dimension fa (number): low-frequency cutoff. If none, highpass at fb fb (number): high-frequency cutoff. If none, lowpass at fa order (1..6): butterworth filter order. Default 4 zerophase (boolean): Use forward-backward filtering? (true) bandstop (boolean): Do band-stop rather than band-pass Parameters ---------- Returns ------- ''' N = data.shape[-1] padded = np.zeros(data.shape[:-1]+(2*N,),dtype=data.dtype) padded[...,N//2 :N//2+N] = data padded[..., :N//2 ] = data[...,N//2:0 :-1] padded[...,N//2+N: ] = data[...,-1 :N//2-1:-1] if not fa is None and not fb is None: if bandstop: b,a = butter(order,np.array([fa,fb])/(0.5*Fs),btype='bandstop') else: b,a = butter(order,np.array([fa,fb])/(0.5*Fs),btype='bandpass') elif not fa==None: # high pass b,a = butter(order,fa/(0.5*Fs),btype='high') assert not bandstop elif not fb==None: # low pass b,a = butter(order,fb/(0.5*Fs),btype='low') assert not bandstop else: raise Exception('Both fa and fb appear to be None') return (filtfilt if zerophase else lfilter)(b,a,padded)[...,N//2:N//2+N] assert 0
def bandpass(w, freqlo, freqhi, fs, npass=2): wn = [2*freqlo/fs, 2*freqhi/fs] (b,a) = signal.butter(npass, wn, btype='band') w = signal.lfilter(b, a, w) return w # A forward-backward linear filter (filtfilt) (added by DmBorisov)
def bandpass2(w, freqlo, freqhi, fs, npass=3): wn = [2*freqlo/fs, 2*freqhi/fs] (b,a) = signal.butter(npass, wn, 'bandpass') w = signal.filtfilt(b, a, w) return w
def butter_lowpass_filter(data, cutoff_freq, nyq_freq, order=4): # Build and apply filter to data (signal) b, a = butter_lowpass(cutoff_freq, nyq_freq, order=order) y = signal.filtfilt(b, a, data) return y
def calcPhase(): ipctx1 = dict() ipctx1["ip"] = np.zeros((1,2001)) dt = p.params["dt"] randSamp = np.random.randint(0,len(AllAs),1000) # Phases of three nuclei phasMus = dict() Flags = [] Flags.append("SWA") phasTAvsTA = [] phasTIvsSTN = [] phasTAvsSTN = [] for i,samp in enumerate(randSamp): #temp = np.zeros((8,len(freqs),len(fftfreq)/10+1)) A = AllAs[samp] B = AllBs[samp] Rates = psGA.calcRates(Flags,1.0,A,B,False,ipctx1,iptau=p.params["iptau"]) # Filter the signals first at SWA frequency, or the phase difference between cortical input signal and TA,TI not calculated correctly due to multiple frequencies in the signal #B, A = sciSig.butter(2,np.array([0.0001,0.0005]),btype='low') B, A = sciSig.butter(2,np.array([0.00005]),btype='low') taFilt = sciSig.filtfilt(B, A, Rates['ta'])#, padlen=150) tiFilt = sciSig.filtfilt(B, A, Rates['ti'])#, padlen=150) stnFilt = sciSig.filtfilt(B, A, Rates['stn'])#, padlen=150) tG = np.arange(0,len(ipctx1["ip"][0])+dt,dt) fftfreq = np.fft.fftfreq(len(taFilt),d=dt) fftta = np.fft.rfft(taFilt-np.mean(taFilt)) fftti = np.fft.rfft(tiFilt-np.mean(tiFilt)) fftstn = np.fft.rfft(stnFilt-np.mean(stnFilt)) fftip = np.fft.rfft(Rates['ipctx'] -np.mean(Rates['ipctx'])) maxta = np.where(np.abs(fftta)==np.max(np.abs(fftta)))[0] maxti = np.where(np.abs(fftti)==np.max(np.abs(fftti)))[0] maxstn = np.where(np.abs(fftstn) == np.max(np.abs(fftstn)))[0] maxip = np.where(np.abs(fftip) == np.max(np.abs(fftip)))[0] print "maxta,maxti,maxstn,maxip",maxta,maxti,maxstn,maxip phasTAvsTA.append(np.angle(fftta[maxta]/fftta[maxta])) phasTIvsSTN.append(np.mean([np.angle(fftti[maxti]/fftstn[maxti]),np.angle(fftti[maxstn]/fftstn[maxstn])] )) phasTAvsSTN.append(np.mean([np.angle(fftta[maxta]/fftstn[maxta]),np.angle(fftta[maxstn]/fftstn[maxstn])] )) phasMus["ta_ta"] = phasTAvsTA phasMus["stn_ti"] = phasTIvsSTN phasMus["stn_ta"] = phasTAvsSTN pickle.dump(phasMus,open(path5+"Phases.pickle","w"))
def filtdata(x, sf, f, axis, filt, cycle, filtorder): """Filt the data using a forward/backward filter to avoid phase shifting. Parameters ---------- x : array_like Array of data sf : float Sampling frequency f : array_like Frequency vector of shape (N, 2) axis : int Axis where the time is located. filt : string Name of the filter to use (only if dcomplex is 'hilbert'). Use either 'eegfilt', 'butter' or 'bessel'. filtorder : int Order of the filter (only if dcomplex is 'hilbert') cycle : int Number of cycles to use for fir1 filtering. """ # fir1 filter : if filt == 'fir1': forder = fir_order(sf, x.shape[axis], f[0], cycle=cycle) b, a = fir1(forder, f / (sf / 2)) # butterworth filter : elif filt == 'butter': b, a = butter(filtorder, [(2 * f[0]) / sf, (2 * f[1]) / sf], btype='bandpass') forder = None # bessel filter : elif filt == 'bessel': b, a = bessel(filtorder, [(2 * f[0]) / sf, (2 * f[1]) / sf], btype='bandpass') forder = None return filtfilt(b, a, x, padlen=forder, axis=axis) ############################################################################### ############################################################################### # FILTER ORDER ############################################################################### ###############################################################################
def bandpass_filter(data, lowcut=None, highcut=None, *, numtaps=None, fs=None): """Band filter data using a zero phase FIR filter (filtfilt). Parameters ---------- data : AnalogSignalArray, ndarray, or list lowcut : float, optional (default 1 Hz) Lower cut-off frequency highcut : float, optional (default 600 Hz) Upper cut-off frequency numtaps : int, optional (default 25) Number of filter taps fs : float, optional if AnalogSignalArray is passed Sampling frequency (Hz) Returns ------- filtered : same type as data """ if numtaps is None: numtaps = 25 if lowcut is None: lowcut = 1 if highcut is None: highcut = 600 if isinstance(data, (np.ndarray, list)): if fs is None: raise ValueError("sampling frequency must be specified!") # Generate filter for detection b = firwin(numtaps=numtaps, cutoff=[lowcut/(fs/2), highcut/(fs/2)], pass_zero=False) # Filter raw data to get ripple data ripple_data = filtfilt(b, 1, data) return ripple_data elif isinstance(data, AnalogSignalArray): if fs is None: fs = data.fs warnings.warn("no sampling frequency provided," " using fs={} Hz from AnalogSignalArray".format(fs)) # Generate filter for detection b = firwin(numtaps=numtaps, cutoff=[lowcut/(fs/2), highcut/(fs/2)], pass_zero=False) # Filter raw data to get ripple data ripple_data = filtfilt(b,1,data.ydata) # Return a copy of the AnalogSignalArray with the filtered data filtered_analogsignalarray = data.copy() filtered_analogsignalarray._ydata = ripple_data return filtered_analogsignalarray else: raise TypeError( "Unknown data type {} to filter.".format(str(type(data))))
def spike_filtfilt(data, lowcut=None, highcut=None, *, fs=None, verbose=False): """Filter data to the spike band (default 600--6000 Hz). Parameters ---------- data : AnalogSignalArray, ndarray, or list lowcut : float, optional (default 600 Hz) Lower cut-off frequency highcut : float, optional (default 6000 Hz) Upper cut-off frequency fs : float, optional if AnalogSignalArray is passed Sampling frequency (Hz) Returns ------- filtered : same type as data """ if isinstance(data, (np.ndarray, list)): if fs is None: raise ValueError("sampling frequency must be specified!") elif isinstance(data, AnalogSignalArray): if fs is None: fs = data.fs if lowcut is None: lowcut = 600 if highcut is None: highcut = 6000 [b, a] = butter(2, lowcut/(fs/2), btype='highpass') [bhigh, ahigh] = butter(1, highcut/(fs/2)) if isinstance(data, (np.ndarray, list)): # Filter raw data spikedata = filtfilt(b, a,filtfilt(bhigh, ahigh, data)) return spikedata elif isinstance(data, AnalogSignalArray): spikedata = filtfilt(b, a, filtfilt(bhigh, ahigh, data.ydata)) # Return a copy of the AnalogSignalArray with the filtered data out = copy.copy(data) out._ydata = spikedata return out
def butterLP(self): #===== parameter ===== coefficients = self.trendLine() axis = str(self.axis_combobox.currentText()) N=10 time_interval = 0.007 sample_rate = int(1/time_interval) Nquist_freq = 0.5 * sample_rate Wn = float(self.filtbutter_lp_slider.value()) / 1430.0 #===== input ===== if axis[0] =='a': input_signal = self.acc_normalize(self.raw_data[axis]) elif axis[0]=='g': input_signal = self.gyro_normalize(self.raw_data[axis]) elif axis[0]=='m': input_signal = self.raw_data[axis] #===== caculation area ===== b, a = signal.butter(N, Wn, 'low') #d, c = signal.bessel(N, Wn, 'low') butter_lp_signal = signal.filtfilt(b, a, input_signal) #===== output ===== raw_a, raw_v, raw_s = self.another_integral(input_signal) butter_lp_a, butter_lp_v, butter_lp_s = self.another_integral(butter_lp_signal) #butter_lp_signal #bessel_lp_a, bessel_lp_v, bessel_lp_s = self.basic_basic_integral(bessel_lp_signal) print "Max: " + str(max(butter_lp_s)) + "; Min: " + str(min(butter_lp_s)) #===== raw vs filt ===== plt.title(axis) plt.subplot(311) plt.grid(True) plt.plot(raw_a, "blue", label="raw_a") plt.plot(butter_lp_a, "red", label="butter_lp_a") plt.legend(loc=2, fontsize = 'medium') plt.subplot(312) plt.grid(True) plt.plot(raw_v, "blue", label="raw_a") plt.plot(butter_lp_v, "red", label="butter_lp_v") plt.legend(loc=2, fontsize = 'medium') plt.subplot(313) plt.grid(True) plt.plot(raw_s, "blue", label="raw_a") plt.plot(butter_lp_s, "red", label="butter_lp_s") plt.legend(loc=2, fontsize = 'medium') plt.show() self.filtbutter_lp_slider.setFocus()