我们从Python开源项目中,提取了以下23个代码示例,用于说明如何使用scipy.signal.detrend()。
def _apply_detrend(da, axis_num): """Wrapper function for applying detrending""" if da.chunks: func = detrend_wrap(detrendn) da = xr.DataArray(func(da.data, axes=axis_num), dims=da.dims, coords=da.coords) else: if da.ndim == 1: da = xr.DataArray(sps.detrend(da), dims=da.dims, coords=da.coords) else: da = detrendn(da, axes=axis_num) # else: # raise ValueError("Data should be dask array.") return da
def _apply_method(x, fMeth, dtrd, method, wltCorr, wltWidth): npts, ntrial = x.shape nFce = len(fMeth) xf = np.zeros((nFce, npts, ntrial)) # Detrend the signal : if dtrd: x = detrend(x, axis=0) # Apply methods : for k in range(nFce): xf[k, ...] = fMeth[k](x) # Correction for the wavelet (due to the wavelet width): if (method == 'wavelet') and (wltCorr is not None): w = 3*wltWidth xf[:, 0:w, :] = xf[:, w+1:2*w+1, :] xf[:, npts-w:npts, :] = xf[:, npts-2*w-1:npts-w-1, :] return xf
def test_dft_4d(): """Test the discrete Fourier transform on 2D data""" N = 16 da = xr.DataArray(np.random.rand(N,N,N,N), dims=['time','z','y','x'], coords={'time':range(N),'z':range(N), 'y':range(N),'x':range(N)} ) ft = xrft.dft(da, shift=False) npt.assert_almost_equal(ft.values, np.fft.fftn(da.values)) with pytest.raises(NotImplementedError): xrft.dft(da, detrend='linear') with pytest.raises(NotImplementedError): xrft.dft(da, dim=['time','y','x'], detrend='linear') da_prime = xrft.detrendn(da[:,0].values, [0,1,2]) # cubic detrend over time, y, and x npt.assert_almost_equal(xrft.dft(da[:,0].drop('z'), dim=['time','y','x'], shift=False, detrend='linear' ).values, np.fft.fftn(da_prime))
def test_dft_3d_dask(): """Test the discrete Fourier transform on 3D dask array data""" N=16 da = xr.DataArray(np.random.rand(N,N,N), dims=['time','x','y'], coords={'time':range(N),'x':range(N), 'y':range(N)} ) daft = xrft.dft(da.chunk({'time': 1}), dim=['x','y'], shift=False) # assert hasattr(daft.data, 'dask') npt.assert_almost_equal(daft.values, np.fft.fftn(da.chunk({'time': 1}).values, axes=[1,2]) ) with pytest.raises(ValueError): xrft.dft(da.chunk({'time': 1, 'x': 1}), dim=['x']) daft = xrft.dft(da.chunk({'x': 1}), dim=['time'], shift=False, detrend='linear') # assert hasattr(daft.data, 'dask') da_prime = sps.detrend(da.chunk({'x': 1}), axis=0) npt.assert_almost_equal(daft.values, np.fft.fftn(da_prime, axes=[0]) )
def test_power_spectrum_dask(): """Test the power spectrum function on dask data""" N = 16 dim = ['x','y'] da = xr.DataArray(np.random.rand(2,N,N), dims=['time','x','y'], coords={'time':range(2),'x':range(N), 'y':range(N)}).chunk({'time': 1} ) ps = xrft.power_spectrum(da, dim=dim, density=False) daft = xrft.dft(da, dim=['x','y']) npt.assert_almost_equal(ps.values, (daft * np.conj(daft)).real.values) ps = xrft.power_spectrum(da, dim=dim, window=True, detrend='constant') daft = xrft.dft(da, dim=dim, window=True, detrend='constant') coord = list(daft.coords) test = (daft * np.conj(daft)).real/N**4 for i in dim: test /= daft['freq_' + i + '_spacing'] npt.assert_almost_equal(ps.values, test) npt.assert_almost_equal(np.ma.masked_invalid(ps).mask.sum(), 0.)
def test_cross_spectrum(): """Test the cross spectrum function""" N = 16 da = xr.DataArray(np.random.rand(N,N), dims=['x','y'], coords={'x':range(N),'y':range(N)} ) da2 = xr.DataArray(np.random.rand(N,N), dims=['x','y'], coords={'x':range(N),'y':range(N)} ) cs = xrft.cross_spectrum(da, da2, window=True, density=False, detrend='constant') daft = xrft.dft(da, dim=None, shift=True, detrend='constant', window=True) daft2 = xrft.dft(da2, dim=None, shift=True, detrend='constant', window=True) npt.assert_almost_equal(cs.values, np.real(daft*np.conj(daft2))) npt.assert_almost_equal(np.ma.masked_invalid(cs).mask.sum(), 0.)
def processData(self, data): try: from scipy.signal import detrend except ImportError: raise Exception("DetrendFilter node requires the package scipy.signal.") return detrend(data)
def segment_by_std_dev(series, increment=2, maximum=20): """ Divides a series into segments, minimizing standard deviation over window size. Windows are of varying size from `increment` to `maximum * increment` at each offset `increment` within the series. :param series: :param increment: :param maximum: :return: """ duration = int(series.index[-2]) windows = [] for i in range(0, duration, increment): for size in range(1, maximum + 1): window = detrend(series[i:i + size*increment]) heappush(windows, (window.std() / (size*increment), i, i + size*increment)) segments = [] spots = set() try: while True: window_agv_std, start, end = heappop(windows) if any(i in spots for i in range(start, int(end))): continue for i in range(start, int(end)): spots.add(int(i)) heappush(segments, (start, min(duration, end))) except IndexError: pass return [heappop(segments) for _ in range(len(segments))]
def get_data(filename, channel=None, start=0, length=None, detrend=None): """Load selected data from HiDAQ recording. :param filename: name of the datafile :param channel: list of channels to read (base 0, None to read all channels) :param start: sample index to start from :param length: number of samples to read (None means read all available samples) :param detrend: processing to be applied to each channel to remove offset/bias (supported values: ``'linear'``, ``'constant'``, ``None``) """ if channel is None: channel = range(_channels) elif isinstance(channel, int): channel = [channel] if length is None: length = get_data_length(filename)-start with open(filename, 'rb') as f: hdr = _np.fromfile(f, dtype=_np.dtype('>i4'), count=1) f.seek(hdr[0]+start*_framelen, _os.SEEK_SET) data = _np.fromfile(f, dtype=_np.dtype('>i2'), count=_channels*length) data = _np.reshape(data, [length,_channels]) data = _np.take(data, channel, axis=1).astype(_np.float) if len(channel) == 1: data = data.ravel() data = data/2048 if detrend is not None: data = _sig.detrend(data, axis=0, type=detrend) return data
def get_data(filename, channel=None, start=0, length=None, detrend='linear'): """Load selected data from DTLA recording. :param filename: name of the datafile :param channel: list of channels to read (base 0, None to read all channels) :param start: sample index to start from :param length: number of samples to read (None means read all available samples) :param detrend: processing to be applied to each channel to remove offset/bias (supported values: ``'linear'``, ``'constant'``, ``None``) """ if channel is None: channel = range(_channels) elif isinstance(channel, int): channel = [channel] if length is None: length = get_data_length(filename)-start with open(filename, 'rb') as f: f.seek(start*_framelen, _os.SEEK_SET) data = _np.fromfile(f, dtype=_np.uint16, count=_framelen/2*length) data = _np.reshape(data, [length,_framelen/2]) data = data[:,2:] data = _np.take(data, channel, axis=1).astype(_np.float) if len(channel) == 1: data = data.ravel() data = 5*data/65536-2.5 if detrend is not None: data = _sig.detrend(data, axis=0, type=detrend) return data
def bandpassfilter(x,fs): """ :param x: a list of samples :param fs: sampling frequency :return: filtered list """ x = signal.detrend(x) b = signal.firls(129,[0,0.6*2/fs,0.7*2/fs,3*2/fs,3.5*2/fs,1],[0,0,1,1,0,0],[100*0.02,0.02,0.02]) return signal.convolve(x,b,'valid')
def detrend_wrap(detrend_func): """ Wrapper function for `xrft.detrendn`. """ def func(a, axes=None): if a.ndim > 4 or len(axes) > 3: raise ValueError("Data has too many dimensions " "and/or too many axes to detrend over.") if axes is None: axes = tuple(range(a.ndim)) else: if len(set(axes)) < len(axes): raise ValueError("Duplicate axes are not allowed.") for each_axis in axes: if len(a.chunks[each_axis]) != 1: raise ValueError('The axis along the detrending is upon ' 'cannot be chunked.') if len(axes) == 1: return dsar.map_blocks(sps.detrend, a, axis=axes[0], chunks=a.chunks, dtype=a.dtype ) else: for each_axis in range(a.ndim): if each_axis not in axes: if len(a.chunks[each_axis]) != a.shape[each_axis]: raise ValueError("The axes other than ones to detrend " "over should have a chunk length of 1.") return dsar.map_blocks(detrend_func, a, axes, chunks=a.chunks, dtype=a.dtype ) return func
def test_dft_1d(test_data_1d): """Test the discrete Fourier transform function on one-dimensional data.""" da = test_data_1d Nx = len(da) dx = float(da.x[1] - da.x[0]) if 'x' in da else 1 # defaults with no keyword args ft = xrft.dft(da, detrend='constant') # check that the frequency dimension was created properly assert ft.dims == ('freq_x',) # check that the coords are correct freq_x_expected = np.fft.fftshift(np.fft.fftfreq(Nx, dx)) npt.assert_allclose(ft['freq_x'], freq_x_expected) # check that a spacing variable was created assert ft['freq_x_spacing'] == freq_x_expected[1] - freq_x_expected[0] # make sure the function is lazy assert isinstance(ft.data, type(da.data)) # check that the Fourier transform itself is correct data = (da - da.mean()).values ft_data_expected = np.fft.fftshift(np.fft.fft(data)) # because the zero frequency component is zero, there is a numerical # precision issue. Fixed by setting atol npt.assert_allclose(ft_data_expected, ft.values, atol=1e-14) # redo without removing mean ft = xrft.dft(da) ft_data_expected = np.fft.fftshift(np.fft.fft(da)) npt.assert_allclose(ft_data_expected, ft.values) # redo with detrending linear least-square fit ft = xrft.dft(da, detrend='linear') da_prime = sps.detrend(da.values) ft_data_expected = np.fft.fftshift(np.fft.fft(da_prime)) npt.assert_allclose(ft_data_expected, ft.values, atol=1e-14) if 'x' in da and not da.chunks: da['x'].values[-1] *= 2 with pytest.raises(ValueError): ft = xrft.dft(da)
def test_isotropic_ps_slope(N=512, dL=1., amp=1e1, s=-3.): """Test the spectral slope of isotropic power spectrum.""" theta = xr.DataArray(_synthetic_field(N, dL, amp, s), dims=['y', 'x'], coords={'y':range(N), 'x':range(N)}) iso_ps = xrft.isotropic_powerspectrum(theta, detrend='constant', density=True) npt.assert_almost_equal(np.ma.masked_invalid(iso_ps[1:]).mask.sum(), 0.) y_fit, a, b = xrft.fit_loglog(iso_ps.freq_r.values[4:], iso_ps.values[4:]) npt.assert_allclose(a, s, atol=.1)
def detrend(x, order=1, axis=-1): """Detrend the array x. Parameters ---------- x : n-d array Signal to detrend. order : int Fit order. Currently must be '0' or '1'. axis : integer Axis of the array to operate on. Returns ------- xf : array x detrended. Examples -------- As in scipy.signal.detrend: >>> randgen = np.random.RandomState(9) >>> npoints = int(1e3) >>> noise = randgen.randn(npoints) >>> x = 3 + 2*np.linspace(0, 1, npoints) + noise >>> (detrend(x) - noise).max() < 0.01 True """ from scipy.signal import detrend if axis > len(x.shape): raise ValueError('x does not have %d axes' % axis) if order == 0: fit = 'constant' elif order == 1: fit = 'linear' else: raise ValueError('order must be 0 or 1') y = detrend(x, axis=axis, type=fit) return y
def plot(self, scale_factor=1.0, plot_ax='x'): time = np.arange(self.delay, (self.n_samples*self.dt + self.delay), self.dt) # Normalize and detrend norm_traces = np.zeros( np.shape(self.timeHistories) ) for k in range(self.n_channels): current_trace = self.timeHistories[:,k] current_trace = signal.detrend(current_trace) # Linear detrend current_trace = current_trace / np.amax(current_trace) # Normalize by max value current_trace = current_trace*scale_factor + self.position[k] # Scale and shift norm_traces[:,k]= current_trace # Plotting if str.lower(plot_ax) == 'y': fig = plt.figure( figsize=(2.75,6) ) ax = fig.add_axes([0.14,0.20,0.8,0.8]) for m in range(self.n_channels): ax.plot(time, norm_traces[:,m],'b-', linewidth=0.5) ax.set_xlim( ( min(time), max(time) ) ) ax.set_ylim( (-self.position[1], self.position[1]+self.position[len(self.position)-1] ) ) ax.set_xticklabels(ax.get_xticks(), fontsize=11, fontname='arial' ) ax.set_yticklabels(ax.get_yticks(), fontsize=11, fontname='arial' ) ax.grid(axis='x', linestyle='--') ax.set_xlabel('Time (s)', fontsize=11, fontname="arial") ax.set_ylabel('Normalized Amplitude', fontsize=11, fontname="arial") ax.tick_params(labelsize=11) ax.tick_params('x', length=4, width=1, which='major') ax.tick_params('y', length=4, width=1, which='major') elif str.lower(plot_ax) == 'x': fig = plt.figure( figsize=(6,2.75) ) ax = fig.add_axes([0.14,0.20,0.8,0.75]) for m in range(self.n_channels): ax.plot(norm_traces[:,m], time, 'b-', linewidth=0.5) ax.set_ylim( ( max(time), min(time) ) ) ax.set_xlim( (-self.position[1], self.position[1]+self.position[len(self.position)-1] ) ) ax.set_yticklabels(ax.get_yticks(), fontsize=11, fontname='arial' ) ax.set_xticklabels(ax.get_xticks(), fontsize=11, fontname='arial' ) ax.grid(axis='y', linestyle='--') ax.set_ylabel('Time (s)', fontsize=11, fontname="arial") ax.set_xlabel('Normalized Amplitude', fontsize=11, fontname="arial") ax.tick_params(labelsize=11) ax.tick_params('y', length=4, width=1, which='major') ax.tick_params('x', length=4, width=1, which='major') return # Method to pad zeros to achieve desired frequency sampling
def maxdelwind(source, target, rate=1, start_time=0., delta_time=1., sample_duration=.5): ''' delay, times = maxdelwid(...) Calculates a time-varying delay function from source (x) to target (y) at intervals delta_time. Parameters: * source: source signal (reuqired) * target: target signal (required) * rate: sampling rate * start_time: starting time for tfe calculations * delta_time: distance between calculations * sample_duration: length of signals used in tfe estimates (longer than window_duration, used in averaging) Returns: * delay: max delay array * times: times corresponding to delay estimates (array size M) ''' # convert time to samples sample_start = int(start_time*rate) sample_delta = int(delta_time*rate) sample_len = int(sample_duration*rate) window = np.ones(sample_len) n_samples = min(len(source), len(target)) sample_end = n_samples - sample_start - sample_len delay = [] corr_strength = [] times = [] for block_start in np.arange(sample_start, sample_end, sample_delta): block_end = block_start + sample_len target_block = sig.detrend(target[block_start:block_end]) source_block = sig.detrend(source[block_start:block_end]) block_del, block_corr = block_delay(target_block, source_block, window=window) times.append((block_start+sample_len/2)/float(rate)) delay.append(block_del/float(rate)) corr_strength.append(block_corr) return np.array(delay), np.array(corr_strength), np.array(times)
def power_spectrum(da, spacing_tol=1e-3, dim=None, shift=True, detrend=None, density=True, window=False): """ Calculates the power spectrum of da. .. math:: da' = da - \overline{da} ps = \mathbb{F}(da') * {\mathbb{F}(da')}^* Parameters ---------- da : `xarray.DataArray` The data to be transformed spacing_tol: float (default) Spacing tolerance. Fourier transform should not be applied to uneven grid but this restriction can be relaxed with this setting. Use caution. dim : list (optional) The dimensions along which to take the transformation. If `None`, all dimensions will be transformed. shift : bool (optional) Whether to shift the fft output. detrend : str (optional) If `constant`, the mean across the transform dimensions will be subtracted before calculating the Fourier transform (FT). If `linear`, the linear least-square fit will be subtracted before the FT. density : list (optional) If true, it will normalize the spectrum to spectral density window : bool (optional) Whether to apply a Hann window to the data before the Fourier transform is taken Returns ------- ps : `xarray.DataArray` Two-dimensional power spectrum """ if dim is None: dim = da.dims # the axes along which to take ffts axis_num = [da.get_axis_num(d) for d in dim] N = [da.shape[n] for n in axis_num] daft = dft(da, spacing_tol, dim=dim, shift=shift, detrend=detrend, window=window) coord = list(daft.coords) ps = (daft * np.conj(daft)).real if density: ps /= (np.asarray(N).prod())**2 for i in dim: ps /= daft['freq_' + i + '_spacing'] return ps
def test_detrend(): N = 16 x = np.arange(N+1) y = np.arange(N-1) t = np.linspace(-int(N/2), int(N/2), N-6) z = np.arange(int(N/2)) d4d = (t[:,np.newaxis,np.newaxis,np.newaxis] + z[np.newaxis,:,np.newaxis,np.newaxis] + y[np.newaxis,np.newaxis,:,np.newaxis] + x[np.newaxis,np.newaxis,np.newaxis,:] ) da4d = xr.DataArray(d4d, dims=['time','z','y','x'], coords={'time':range(len(t)),'z':range(len(z)),'y':range(len(y)), 'x':range(len(x))} ) func = xrft.detrend_wrap(xrft.detrendn) ######### # Chunk along the `time` axis ######### da = da4d.chunk({'time': 1}) with pytest.raises(ValueError): func(da.data, axes=[0]).compute with pytest.raises(ValueError): func(da.data, axes=[0,1,2,3]).compute() da_prime = func(da.data, axes=[2]).compute() npt.assert_allclose(da_prime[0,0], sps.detrend(d4d[0,0], axis=0)) da_prime = func(da.data, axes=[1,2,3]).compute() npt.assert_allclose(da_prime[0], xrft.detrendn(d4d[0], axes=[0,1,2])) ######### # Chunk along the `time` and `z` axes ######### da = da4d.chunk({'time':1, 'z':1}) with pytest.raises(ValueError): func(da.data, axes=[1,2]).compute() with pytest.raises(ValueError): func(da.data, axes=[2,2]).compute() da_prime = func(da.data, axes=[2,3]).compute() npt.assert_allclose(da_prime[0,0], xrft.detrendn(d4d[0,0], axes=[0,1])) s = np.arange(2) d5d = d4d[np.newaxis,:,:,:,:] + s[:,np.newaxis,np.newaxis, np.newaxis,np.newaxis] da5d = xr.DataArray(d5d, dims=['s','time','z','y','x'], coords={'s':range(len(s)),'time':range(len(t)), 'z':range(len(z)),'y':range(len(y)), 'x':range(len(x))} ) da = da5d.chunk({'time':1}) with pytest.raises(ValueError): func(da.data).compute()
def test_power_spectrum(): """Test the power spectrum function""" N = 16 da = xr.DataArray(np.random.rand(N,N), dims=['x','y'], coords={'x':range(N),'y':range(N)} ) ps = xrft.power_spectrum(da, window=True, density=False, detrend='constant') daft = xrft.dft(da, dim=None, shift=True, detrend='constant', window=True) npt.assert_almost_equal(ps.values, np.real(daft*np.conj(daft))) npt.assert_almost_equal(np.ma.masked_invalid(ps).mask.sum(), 0.) ### Normalized dim = da.dims ps = xrft.power_spectrum(da, window=True, detrend='constant') daft = xrft.dft(da, window=True, detrend='constant') coord = list(daft.coords) test = np.real(daft*np.conj(daft))/N**4 for i in range(len(dim)): test /= daft[coord[-i-1]].values npt.assert_almost_equal(ps.values, test) npt.assert_almost_equal(np.ma.masked_invalid(ps).mask.sum(), 0.) ### Remove mean da = xr.DataArray(np.random.rand(5,20,30), dims=['time', 'y', 'x'], coords={'time': np.arange(5), 'y': np.arange(20), 'x': np.arange(30)}) ps = xrft.power_spectrum(da, dim=['y', 'x'], window=True, density=False, detrend='constant' ) daft = xrft.dft(da, dim=['y','x'], window=True, detrend='constant') npt.assert_almost_equal(ps.values, np.real(daft*np.conj(daft))) npt.assert_almost_equal(np.ma.masked_invalid(ps).mask.sum(), 0.) ### Remove least-square fit da_prime = np.zeros_like(da.values) for t in range(5): da_prime[t] = numpy_detrend(da[t].values) da_prime = xr.DataArray(da_prime, dims=da.dims, coords=da.coords) ps = xrft.power_spectrum(da_prime, dim=['y', 'x'], window=True, density=False, detrend='constant' ) daft = xrft.dft(da_prime, dim=['y','x'], window=True, detrend='constant') npt.assert_almost_equal(ps.values, np.real(daft*np.conj(daft))) npt.assert_almost_equal(np.ma.masked_invalid(ps).mask.sum(), 0.)
def test_parseval(): """Test whether the Parseval's relation is satisfied.""" N = 16 da = xr.DataArray(np.random.rand(N,N), dims=['x','y'], coords={'x':range(N), 'y':range(N)}) da2 = xr.DataArray(np.random.rand(N,N), dims=['x','y'], coords={'x':range(N), 'y':range(N)}) dim = da.dims delta_x = [] for d in dim: coord = da[d] diff = np.diff(coord) delta = diff[0] delta_x.append(delta) window = np.hanning(N) * np.hanning(N)[:, np.newaxis] ps = xrft.power_spectrum(da, window=True, detrend='constant') da_prime = da.values - da.mean(dim=dim).values npt.assert_almost_equal(ps.values.sum(), (np.asarray(delta_x).prod() * ((da_prime*window)**2).sum() ), decimal=5 ) cs = xrft.cross_spectrum(da, da2, window=True, detrend='constant') da2_prime = da2.values - da2.mean(dim=dim).values npt.assert_almost_equal(cs.values.sum(), (np.asarray(delta_x).prod() * ((da_prime*window) * (da2_prime*window)).sum() ), decimal=5 ) d3d = xr.DataArray(np.random.rand(N,N,N), dims=['time','y','x'], coords={'time':range(N), 'y':range(N), 'x':range(N)} ).chunk({'time':1}) ps = xrft.power_spectrum(d3d, dim=['x','y'], window=True, detrend='linear') npt.assert_almost_equal(ps[0].values.sum(), (np.asarray(delta_x).prod() * ((numpy_detrend(d3d[0].values)*window)**2).sum() ), decimal=5 )
def process_traces(self, s, h): """ Performs data processing operations on traces """ # remove linear trend #s = _signal.detrend(s) # mute (added by DmBorisov) if PAR.MUTE: vel = PAR.MUTESLOPE off = PAR.T0_TOP inn = PAR.MUTEINNER mute_out = PAR.MUTEOUTER s = smute(s, h, vel, off, inn, mute_out, constant_spacing=False) vel = PAR.MUTESLOPE_BTM off = PAR.T0_BOT s = smutelow(s, h, vel, off, inn, mute_out, constant_spacing=False) # filter data (modified by DmBorisov) if PAR.BANDPASS: s = sbandpass(s, h, PAR.FREQLO, PAR.FREQHI) # scale all traces by a single value (norm) (added by DmBorisov) if PAR.NORMALIZE_ALL: sum_norm = np.linalg.norm(s, ord=2) if sum_norm > 0: s /= sum_norm # mute (added by DmBorisov) if PAR.MUTE: vel = PAR.MUTESLOPE off = PAR.T0_TOP inn = PAR.MUTEINNER mute_out = PAR.MUTEOUTER s = smute(s, h, vel, off, inn, mute_out, constant_spacing=False) vel = PAR.MUTESLOPE_BTM off = PAR.T0_BOT s = smutelow(s, h, vel, off, inn, mute_out, constant_spacing=False) return s