Python scipy.signal 模块,detrend() 实例源码

我们从Python开源项目中,提取了以下23个代码示例,用于说明如何使用scipy.signal.detrend()

项目:xrft    作者:rabernat    | 项目源码 | 文件源码
def _apply_detrend(da, axis_num):
    """Wrapper function for applying detrending"""
    if da.chunks:
        func = detrend_wrap(detrendn)
        da = xr.DataArray(func(da.data, axes=axis_num),
                        dims=da.dims, coords=da.coords)
    else:
        if da.ndim == 1:
            da = xr.DataArray(sps.detrend(da),
                            dims=da.dims, coords=da.coords)
        else:
            da = detrendn(da, axes=axis_num)
        # else:
        #     raise ValueError("Data should be dask array.")

    return da
项目:brainpipe    作者:EtienneCmb    | 项目源码 | 文件源码
def _apply_method(x, fMeth, dtrd, method, wltCorr, wltWidth):
    npts, ntrial = x.shape
    nFce = len(fMeth)
    xf = np.zeros((nFce, npts, ntrial))

    # Detrend the signal :
    if dtrd:
        x = detrend(x, axis=0)

    # Apply methods :
    for k in range(nFce):
        xf[k, ...] = fMeth[k](x)

    # Correction for the wavelet (due to the wavelet width):
    if (method == 'wavelet') and (wltCorr is not None):
        w = 3*wltWidth
        xf[:, 0:w, :] = xf[:, w+1:2*w+1, :]
        xf[:, npts-w:npts, :] = xf[:, npts-2*w-1:npts-w-1, :]

    return xf
项目:xrft    作者:rabernat    | 项目源码 | 文件源码
def test_dft_4d():
    """Test the discrete Fourier transform on 2D data"""
    N = 16
    da = xr.DataArray(np.random.rand(N,N,N,N),
                    dims=['time','z','y','x'],
                    coords={'time':range(N),'z':range(N),
                            'y':range(N),'x':range(N)}
                     )
    ft = xrft.dft(da, shift=False)
    npt.assert_almost_equal(ft.values, np.fft.fftn(da.values))

    with pytest.raises(NotImplementedError):
        xrft.dft(da, detrend='linear')
    with pytest.raises(NotImplementedError):
        xrft.dft(da, dim=['time','y','x'], detrend='linear')

    da_prime = xrft.detrendn(da[:,0].values, [0,1,2]) # cubic detrend over time, y, and x
    npt.assert_almost_equal(xrft.dft(da[:,0].drop('z'),
                                    dim=['time','y','x'],
                                    shift=False, detrend='linear'
                                    ).values,
                            np.fft.fftn(da_prime))
项目:xrft    作者:rabernat    | 项目源码 | 文件源码
def test_dft_3d_dask():
    """Test the discrete Fourier transform on 3D dask array data"""
    N=16
    da = xr.DataArray(np.random.rand(N,N,N), dims=['time','x','y'],
                      coords={'time':range(N),'x':range(N),
                              'y':range(N)}
                     )
    daft = xrft.dft(da.chunk({'time': 1}), dim=['x','y'], shift=False)
    # assert hasattr(daft.data, 'dask')
    npt.assert_almost_equal(daft.values,
                        np.fft.fftn(da.chunk({'time': 1}).values, axes=[1,2])
                           )

    with pytest.raises(ValueError):
        xrft.dft(da.chunk({'time': 1, 'x': 1}), dim=['x'])

    daft = xrft.dft(da.chunk({'x': 1}), dim=['time'],
                    shift=False, detrend='linear')
    # assert hasattr(daft.data, 'dask')
    da_prime = sps.detrend(da.chunk({'x': 1}), axis=0)
    npt.assert_almost_equal(daft.values,
                        np.fft.fftn(da_prime, axes=[0])
                           )
项目:xrft    作者:rabernat    | 项目源码 | 文件源码
def test_power_spectrum_dask():
    """Test the power spectrum function on dask data"""
    N = 16
    dim = ['x','y']
    da = xr.DataArray(np.random.rand(2,N,N), dims=['time','x','y'],
                      coords={'time':range(2),'x':range(N),
                              'y':range(N)}).chunk({'time': 1}
                     )
    ps = xrft.power_spectrum(da, dim=dim, density=False)
    daft = xrft.dft(da, dim=['x','y'])
    npt.assert_almost_equal(ps.values, (daft * np.conj(daft)).real.values)

    ps = xrft.power_spectrum(da, dim=dim, window=True, detrend='constant')
    daft = xrft.dft(da, dim=dim, window=True, detrend='constant')
    coord = list(daft.coords)
    test = (daft * np.conj(daft)).real/N**4
    for i in dim:
        test /= daft['freq_' + i + '_spacing']
    npt.assert_almost_equal(ps.values, test)
    npt.assert_almost_equal(np.ma.masked_invalid(ps).mask.sum(), 0.)
项目:xrft    作者:rabernat    | 项目源码 | 文件源码
def test_cross_spectrum():
    """Test the cross spectrum function"""
    N = 16
    da = xr.DataArray(np.random.rand(N,N), dims=['x','y'],
                    coords={'x':range(N),'y':range(N)}
                     )
    da2 = xr.DataArray(np.random.rand(N,N), dims=['x','y'],
                    coords={'x':range(N),'y':range(N)}
                     )
    cs = xrft.cross_spectrum(da, da2, window=True, density=False,
                            detrend='constant')
    daft = xrft.dft(da,
                    dim=None, shift=True, detrend='constant',
                    window=True)
    daft2 = xrft.dft(da2,
                    dim=None, shift=True, detrend='constant',
                    window=True)
    npt.assert_almost_equal(cs.values, np.real(daft*np.conj(daft2)))
    npt.assert_almost_equal(np.ma.masked_invalid(cs).mask.sum(), 0.)
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def processData(self, data):
        try:
            from scipy.signal import detrend
        except ImportError:
            raise Exception("DetrendFilter node requires the package scipy.signal.")
        return detrend(data)
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def processData(self, data):
        try:
            from scipy.signal import detrend
        except ImportError:
            raise Exception("DetrendFilter node requires the package scipy.signal.")
        return detrend(data)
项目:croissance    作者:biosustain    | 项目源码 | 文件源码
def segment_by_std_dev(series, increment=2, maximum=20):
    """
    Divides a series into segments, minimizing standard deviation over window size. Windows are of varying size from
    `increment` to `maximum * increment` at each offset `increment` within the series.

    :param series:
    :param increment:
    :param maximum:
    :return:
    """
    duration = int(series.index[-2])
    windows = []

    for i in range(0, duration, increment):
        for size in range(1, maximum + 1):
            window = detrend(series[i:i + size*increment])
            heappush(windows, (window.std() / (size*increment), i, i + size*increment))

    segments = []
    spots = set()

    try:
        while True:
            window_agv_std, start, end = heappop(windows)

            if any(i in spots for i in range(start, int(end))):
                continue

            for i in range(start, int(end)):
                spots.add(int(i))

            heappush(segments, (start, min(duration, end)))

    except IndexError:
        pass

    return [heappop(segments) for _ in range(len(segments))]
项目:arlpy    作者:org-arl    | 项目源码 | 文件源码
def get_data(filename, channel=None, start=0, length=None, detrend=None):
    """Load selected data from HiDAQ recording.

    :param filename: name of the datafile
    :param channel: list of channels to read (base 0, None to read all channels)
    :param start: sample index to start from
    :param length: number of samples to read (None means read all available samples)
    :param detrend: processing to be applied to each channel to remove offset/bias
                    (supported values: ``'linear'``, ``'constant'``, ``None``)
    """
    if channel is None:
        channel = range(_channels)
    elif isinstance(channel, int):
        channel = [channel]
    if length is None:
        length = get_data_length(filename)-start
    with open(filename, 'rb') as f:
        hdr = _np.fromfile(f, dtype=_np.dtype('>i4'), count=1)
        f.seek(hdr[0]+start*_framelen, _os.SEEK_SET)
        data = _np.fromfile(f, dtype=_np.dtype('>i2'), count=_channels*length)
    data = _np.reshape(data, [length,_channels])
    data = _np.take(data, channel, axis=1).astype(_np.float)
    if len(channel) == 1:
        data = data.ravel()
    data = data/2048
    if detrend is not None:
        data = _sig.detrend(data, axis=0, type=detrend)
    return data
项目:arlpy    作者:org-arl    | 项目源码 | 文件源码
def get_data(filename, channel=None, start=0, length=None, detrend='linear'):
    """Load selected data from DTLA recording.

    :param filename: name of the datafile
    :param channel: list of channels to read (base 0, None to read all channels)
    :param start: sample index to start from
    :param length: number of samples to read (None means read all available samples)
    :param detrend: processing to be applied to each channel to remove offset/bias
                    (supported values: ``'linear'``, ``'constant'``, ``None``)
    """
    if channel is None:
        channel = range(_channels)
    elif isinstance(channel, int):
        channel = [channel]
    if length is None:
        length = get_data_length(filename)-start
    with open(filename, 'rb') as f:
        f.seek(start*_framelen, _os.SEEK_SET)
        data = _np.fromfile(f, dtype=_np.uint16, count=_framelen/2*length)
    data = _np.reshape(data, [length,_framelen/2])
    data = data[:,2:]
    data = _np.take(data, channel, axis=1).astype(_np.float)
    if len(channel) == 1:
        data = data.ravel()
    data = 5*data/65536-2.5
    if detrend is not None:
        data = _sig.detrend(data, axis=0, type=detrend)
    return data
项目:CerebralCortex-2.0-legacy    作者:MD2Korg    | 项目源码 | 文件源码
def bandpassfilter(x,fs):
    """

    :param x: a list of samples 
    :param fs: sampling frequency
    :return: filtered list
    """
    x = signal.detrend(x)
    b = signal.firls(129,[0,0.6*2/fs,0.7*2/fs,3*2/fs,3.5*2/fs,1],[0,0,1,1,0,0],[100*0.02,0.02,0.02])
    return signal.convolve(x,b,'valid')
项目:xrft    作者:rabernat    | 项目源码 | 文件源码
def detrend_wrap(detrend_func):
    """
    Wrapper function for `xrft.detrendn`.
    """
    def func(a, axes=None):
        if a.ndim > 4 or len(axes) > 3:
            raise ValueError("Data has too many dimensions "
                            "and/or too many axes to detrend over.")
        if axes is None:
            axes = tuple(range(a.ndim))
        else:
            if len(set(axes)) < len(axes):
                raise ValueError("Duplicate axes are not allowed.")

        for each_axis in axes:
            if len(a.chunks[each_axis]) != 1:
                raise ValueError('The axis along the detrending is upon '
                                'cannot be chunked.')

        if len(axes) == 1:
            return dsar.map_blocks(sps.detrend, a, axis=axes[0],
                                   chunks=a.chunks, dtype=a.dtype
                                  )
        else:
            for each_axis in range(a.ndim):
                if each_axis not in axes:
                    if len(a.chunks[each_axis]) != a.shape[each_axis]:
                        raise ValueError("The axes other than ones to detrend "
                                        "over should have a chunk length of 1.")
            return dsar.map_blocks(detrend_func, a, axes,
                                   chunks=a.chunks, dtype=a.dtype
                                  )

    return func
项目:xrft    作者:rabernat    | 项目源码 | 文件源码
def test_dft_1d(test_data_1d):
    """Test the discrete Fourier transform function on one-dimensional data."""
    da = test_data_1d
    Nx = len(da)
    dx = float(da.x[1] - da.x[0]) if 'x' in da else 1

    # defaults with no keyword args
    ft = xrft.dft(da, detrend='constant')
    # check that the frequency dimension was created properly
    assert ft.dims == ('freq_x',)
    # check that the coords are correct
    freq_x_expected = np.fft.fftshift(np.fft.fftfreq(Nx, dx))
    npt.assert_allclose(ft['freq_x'], freq_x_expected)
    # check that a spacing variable was created
    assert ft['freq_x_spacing'] == freq_x_expected[1] - freq_x_expected[0]
    # make sure the function is lazy
    assert isinstance(ft.data, type(da.data))
    # check that the Fourier transform itself is correct
    data = (da - da.mean()).values
    ft_data_expected = np.fft.fftshift(np.fft.fft(data))
    # because the zero frequency component is zero, there is a numerical
    # precision issue. Fixed by setting atol
    npt.assert_allclose(ft_data_expected, ft.values, atol=1e-14)


    # redo without removing mean
    ft = xrft.dft(da)
    ft_data_expected = np.fft.fftshift(np.fft.fft(da))
    npt.assert_allclose(ft_data_expected, ft.values)

    # redo with detrending linear least-square fit
    ft = xrft.dft(da, detrend='linear')
    da_prime = sps.detrend(da.values)
    ft_data_expected = np.fft.fftshift(np.fft.fft(da_prime))
    npt.assert_allclose(ft_data_expected, ft.values, atol=1e-14)

    if 'x' in da and not da.chunks:
        da['x'].values[-1] *= 2
        with pytest.raises(ValueError):
            ft = xrft.dft(da)
项目:xrft    作者:rabernat    | 项目源码 | 文件源码
def test_isotropic_ps_slope(N=512, dL=1., amp=1e1, s=-3.):
    """Test the spectral slope of isotropic power spectrum."""

    theta = xr.DataArray(_synthetic_field(N, dL, amp, s),
                        dims=['y', 'x'],
                        coords={'y':range(N), 'x':range(N)})
    iso_ps = xrft.isotropic_powerspectrum(theta, detrend='constant',
                                        density=True)
    npt.assert_almost_equal(np.ma.masked_invalid(iso_ps[1:]).mask.sum(), 0.)
    y_fit, a, b = xrft.fit_loglog(iso_ps.freq_r.values[4:],
                                iso_ps.values[4:])

    npt.assert_allclose(a, s, atol=.1)
项目:decoding_challenge_cortana_2016_3rd    作者:kingjr    | 项目源码 | 文件源码
def detrend(x, order=1, axis=-1):
    """Detrend the array x.

    Parameters
    ----------
    x : n-d array
        Signal to detrend.
    order : int
        Fit order. Currently must be '0' or '1'.
    axis : integer
        Axis of the array to operate on.

    Returns
    -------
    xf : array
        x detrended.

    Examples
    --------
    As in scipy.signal.detrend:
        >>> randgen = np.random.RandomState(9)
        >>> npoints = int(1e3)
        >>> noise = randgen.randn(npoints)
        >>> x = 3 + 2*np.linspace(0, 1, npoints) + noise
        >>> (detrend(x) - noise).max() < 0.01
        True
    """
    from scipy.signal import detrend
    if axis > len(x.shape):
        raise ValueError('x does not have %d axes' % axis)
    if order == 0:
        fit = 'constant'
    elif order == 1:
        fit = 'linear'
    else:
        raise ValueError('order must be 0 or 1')

    y = detrend(x, axis=axis, type=fit)

    return y
项目:PyMASWdisp    作者:dpteague    | 项目源码 | 文件源码
def plot(self, scale_factor=1.0, plot_ax='x'):
        time = np.arange(self.delay, (self.n_samples*self.dt + self.delay), self.dt)
        # Normalize and detrend
        norm_traces = np.zeros( np.shape(self.timeHistories) )
        for k in range(self.n_channels):
            current_trace = self.timeHistories[:,k]
            current_trace = signal.detrend(current_trace)                   # Linear detrend
            current_trace = current_trace / np.amax(current_trace)          # Normalize by max value
            current_trace = current_trace*scale_factor + self.position[k]   # Scale and shift
            norm_traces[:,k]= current_trace

        # Plotting
        if str.lower(plot_ax) == 'y':
            fig = plt.figure( figsize=(2.75,6) )
            ax = fig.add_axes([0.14,0.20,0.8,0.8])
            for m in range(self.n_channels):
                ax.plot(time, norm_traces[:,m],'b-', linewidth=0.5)
            ax.set_xlim( ( min(time), max(time) ) )
            ax.set_ylim( (-self.position[1], self.position[1]+self.position[len(self.position)-1] ) )
            ax.set_xticklabels(ax.get_xticks(), fontsize=11, fontname='arial' )
            ax.set_yticklabels(ax.get_yticks(), fontsize=11, fontname='arial' )
            ax.grid(axis='x', linestyle='--')
            ax.set_xlabel('Time (s)', fontsize=11, fontname="arial")
            ax.set_ylabel('Normalized Amplitude', fontsize=11, fontname="arial")
            ax.tick_params(labelsize=11)
            ax.tick_params('x', length=4, width=1, which='major')
            ax.tick_params('y', length=4, width=1, which='major')
        elif str.lower(plot_ax) == 'x':
            fig = plt.figure( figsize=(6,2.75) )
            ax = fig.add_axes([0.14,0.20,0.8,0.75])
            for m in range(self.n_channels):
                ax.plot(norm_traces[:,m], time, 'b-', linewidth=0.5)
            ax.set_ylim( ( max(time), min(time) ) )
            ax.set_xlim( (-self.position[1], self.position[1]+self.position[len(self.position)-1] ) ) 
            ax.set_yticklabels(ax.get_yticks(), fontsize=11, fontname='arial' )
            ax.set_xticklabels(ax.get_xticks(), fontsize=11, fontname='arial' ) 
            ax.grid(axis='y', linestyle='--') 
            ax.set_ylabel('Time (s)', fontsize=11, fontname="arial")
            ax.set_xlabel('Normalized Amplitude', fontsize=11, fontname="arial")
            ax.tick_params(labelsize=11)
            ax.tick_params('y', length=4, width=1, which='major')
            ax.tick_params('x', length=4, width=1, which='major')
        return 

    # Method to pad zeros to achieve desired frequency sampling
项目:PyPeVoc    作者:goiosunsw    | 项目源码 | 文件源码
def maxdelwind(source, target, rate=1, start_time=0., delta_time=1.,
               sample_duration=.5):
    '''
    delay, times = maxdelwid(...)

    Calculates a time-varying delay function from source (x)
    to target (y) at intervals delta_time.

    Parameters:
    * source: source signal (reuqired)
    * target: target signal (required)
    * rate:   sampling rate
    * start_time: starting time for tfe calculations
    * delta_time: distance between calculations
    * sample_duration: length of signals used in tfe estimates
                       (longer than window_duration, used in averaging)
    Returns:
    * delay: max delay array
    * times: times corresponding to delay estimates (array size M)
    '''

    # convert time to samples
    sample_start = int(start_time*rate)
    sample_delta = int(delta_time*rate)
    sample_len = int(sample_duration*rate)

    window = np.ones(sample_len)

    n_samples = min(len(source), len(target))
    sample_end = n_samples - sample_start - sample_len

    delay = []
    corr_strength = []
    times = []

    for block_start in np.arange(sample_start, sample_end, sample_delta):
        block_end = block_start + sample_len
        target_block = sig.detrend(target[block_start:block_end])
        source_block = sig.detrend(source[block_start:block_end])
        block_del, block_corr = block_delay(target_block, source_block,
                                            window=window)
        times.append((block_start+sample_len/2)/float(rate))
        delay.append(block_del/float(rate))
        corr_strength.append(block_corr)

    return np.array(delay), np.array(corr_strength), np.array(times)
项目:xrft    作者:rabernat    | 项目源码 | 文件源码
def power_spectrum(da, spacing_tol=1e-3, dim=None, shift=True, detrend=None, density=True,
                window=False):
    """
    Calculates the power spectrum of da.

    .. math::

     da' = da - \overline{da}
     ps = \mathbb{F}(da') * {\mathbb{F}(da')}^*

    Parameters
    ----------
    da : `xarray.DataArray`
        The data to be transformed
    spacing_tol: float (default)
        Spacing tolerance. Fourier transform should not be applied to uneven grid but 
        this restriction can be relaxed with this setting. Use caution.
    dim : list (optional)
        The dimensions along which to take the transformation. If `None`, all
        dimensions will be transformed.
    shift : bool (optional)
        Whether to shift the fft output.
    detrend : str (optional)
        If `constant`, the mean across the transform dimensions will be
        subtracted before calculating the Fourier transform (FT).
        If `linear`, the linear least-square fit will be subtracted before
        the FT.
    density : list (optional)
        If true, it will normalize the spectrum to spectral density
    window : bool (optional)
        Whether to apply a Hann window to the data before the Fourier
        transform is taken

    Returns
    -------
    ps : `xarray.DataArray`
        Two-dimensional power spectrum
    """

    if dim is None:
        dim = da.dims

    # the axes along which to take ffts
    axis_num = [da.get_axis_num(d) for d in dim]

    N = [da.shape[n] for n in axis_num]

    daft = dft(da, spacing_tol,
            dim=dim, shift=shift, detrend=detrend,
            window=window)

    coord = list(daft.coords)

    ps = (daft * np.conj(daft)).real

    if density:
        ps /= (np.asarray(N).prod())**2
        for i in dim:
            ps /= daft['freq_' + i + '_spacing']

    return ps
项目:xrft    作者:rabernat    | 项目源码 | 文件源码
def test_detrend():
    N = 16
    x = np.arange(N+1)
    y = np.arange(N-1)
    t = np.linspace(-int(N/2), int(N/2), N-6)
    z = np.arange(int(N/2))
    d4d = (t[:,np.newaxis,np.newaxis,np.newaxis]
            + z[np.newaxis,:,np.newaxis,np.newaxis]
            + y[np.newaxis,np.newaxis,:,np.newaxis]
            + x[np.newaxis,np.newaxis,np.newaxis,:]
          )
    da4d = xr.DataArray(d4d, dims=['time','z','y','x'],
                     coords={'time':range(len(t)),'z':range(len(z)),'y':range(len(y)),
                             'x':range(len(x))}
                     )

    func = xrft.detrend_wrap(xrft.detrendn)

    #########
    # Chunk along the `time` axis
    #########
    da = da4d.chunk({'time': 1})
    with pytest.raises(ValueError):
        func(da.data, axes=[0]).compute
    with pytest.raises(ValueError):
        func(da.data, axes=[0,1,2,3]).compute()
    da_prime = func(da.data, axes=[2]).compute()
    npt.assert_allclose(da_prime[0,0], sps.detrend(d4d[0,0], axis=0))
    da_prime = func(da.data, axes=[1,2,3]).compute()
    npt.assert_allclose(da_prime[0],
                        xrft.detrendn(d4d[0], axes=[0,1,2]))

    #########
    # Chunk along the `time` and `z` axes
    #########
    da = da4d.chunk({'time':1, 'z':1})
    with pytest.raises(ValueError):
        func(da.data, axes=[1,2]).compute()
    with pytest.raises(ValueError):
        func(da.data, axes=[2,2]).compute()
    da_prime = func(da.data, axes=[2,3]).compute()
    npt.assert_allclose(da_prime[0,0],
                        xrft.detrendn(d4d[0,0], axes=[0,1]))

    s = np.arange(2)
    d5d = d4d[np.newaxis,:,:,:,:] + s[:,np.newaxis,np.newaxis,
                                        np.newaxis,np.newaxis]
    da5d = xr.DataArray(d5d, dims=['s','time','z','y','x'],
                     coords={'s':range(len(s)),'time':range(len(t)),
                             'z':range(len(z)),'y':range(len(y)),
                             'x':range(len(x))}
                       )
    da = da5d.chunk({'time':1})
    with pytest.raises(ValueError):
        func(da.data).compute()
项目:xrft    作者:rabernat    | 项目源码 | 文件源码
def test_power_spectrum():
    """Test the power spectrum function"""
    N = 16
    da = xr.DataArray(np.random.rand(N,N), dims=['x','y'],
                    coords={'x':range(N),'y':range(N)}
                     )
    ps = xrft.power_spectrum(da, window=True, density=False,
                            detrend='constant')
    daft = xrft.dft(da,
                    dim=None, shift=True, detrend='constant',
                    window=True)
    npt.assert_almost_equal(ps.values, np.real(daft*np.conj(daft)))
    npt.assert_almost_equal(np.ma.masked_invalid(ps).mask.sum(), 0.)

    ### Normalized
    dim = da.dims
    ps = xrft.power_spectrum(da, window=True, detrend='constant')
    daft = xrft.dft(da, window=True, detrend='constant')
    coord = list(daft.coords)
    test = np.real(daft*np.conj(daft))/N**4
    for i in range(len(dim)):
        test /= daft[coord[-i-1]].values
    npt.assert_almost_equal(ps.values, test)
    npt.assert_almost_equal(np.ma.masked_invalid(ps).mask.sum(), 0.)

    ### Remove mean
    da = xr.DataArray(np.random.rand(5,20,30),
                  dims=['time', 'y', 'x'],
                  coords={'time': np.arange(5),
                        'y': np.arange(20), 'x': np.arange(30)})
    ps = xrft.power_spectrum(da, dim=['y', 'x'],
                            window=True, density=False, detrend='constant'
                            )
    daft = xrft.dft(da, dim=['y','x'], window=True, detrend='constant')
    npt.assert_almost_equal(ps.values, np.real(daft*np.conj(daft)))
    npt.assert_almost_equal(np.ma.masked_invalid(ps).mask.sum(), 0.)

    ### Remove least-square fit
    da_prime = np.zeros_like(da.values)
    for t in range(5):
        da_prime[t] = numpy_detrend(da[t].values)
    da_prime = xr.DataArray(da_prime, dims=da.dims, coords=da.coords)
    ps = xrft.power_spectrum(da_prime, dim=['y', 'x'],
                            window=True, density=False, detrend='constant'
                            )
    daft = xrft.dft(da_prime, dim=['y','x'], window=True, detrend='constant')
    npt.assert_almost_equal(ps.values, np.real(daft*np.conj(daft)))
    npt.assert_almost_equal(np.ma.masked_invalid(ps).mask.sum(), 0.)
项目:xrft    作者:rabernat    | 项目源码 | 文件源码
def test_parseval():
    """Test whether the Parseval's relation is satisfied."""

    N = 16
    da = xr.DataArray(np.random.rand(N,N),
                    dims=['x','y'], coords={'x':range(N), 'y':range(N)})
    da2 = xr.DataArray(np.random.rand(N,N),
                    dims=['x','y'], coords={'x':range(N), 'y':range(N)})

    dim = da.dims
    delta_x = []
    for d in dim:
        coord = da[d]
        diff = np.diff(coord)
        delta = diff[0]
        delta_x.append(delta)

    window = np.hanning(N) * np.hanning(N)[:, np.newaxis]
    ps = xrft.power_spectrum(da, window=True, detrend='constant')
    da_prime = da.values - da.mean(dim=dim).values
    npt.assert_almost_equal(ps.values.sum(),
                            (np.asarray(delta_x).prod()
                            * ((da_prime*window)**2).sum()
                            ), decimal=5
                            )

    cs = xrft.cross_spectrum(da, da2, window=True, detrend='constant')
    da2_prime = da2.values - da2.mean(dim=dim).values
    npt.assert_almost_equal(cs.values.sum(),
                            (np.asarray(delta_x).prod()
                            * ((da_prime*window)
                            * (da2_prime*window)).sum()
                            ), decimal=5
                            )

    d3d = xr.DataArray(np.random.rand(N,N,N),
                    dims=['time','y','x'],
                    coords={'time':range(N), 'y':range(N), 'x':range(N)}
                      ).chunk({'time':1})
    ps = xrft.power_spectrum(d3d, dim=['x','y'], window=True, detrend='linear')
    npt.assert_almost_equal(ps[0].values.sum(),
                            (np.asarray(delta_x).prod()
                            * ((numpy_detrend(d3d[0].values)*window)**2).sum()
                            ), decimal=5
                           )
项目:SeisFlows_tunnel    作者:DmBorisov    | 项目源码 | 文件源码
def process_traces(self, s, h):
        """ Performs data processing operations on traces
        """
        # remove linear trend
        #s = _signal.detrend(s)

        # mute (added by DmBorisov) 
        if PAR.MUTE:
            vel = PAR.MUTESLOPE
            off = PAR.T0_TOP
            inn = PAR.MUTEINNER
            mute_out = PAR.MUTEOUTER
            s = smute(s, h, vel, off, inn, mute_out, constant_spacing=False)
            vel = PAR.MUTESLOPE_BTM
            off = PAR.T0_BOT
            s = smutelow(s, h, vel, off, inn, mute_out, constant_spacing=False)


        # filter data (modified by DmBorisov)
        if PAR.BANDPASS:
            s = sbandpass(s, h, PAR.FREQLO, PAR.FREQHI)


        # scale all traces by a single value (norm) (added by DmBorisov)
        if PAR.NORMALIZE_ALL:
        sum_norm = np.linalg.norm(s, ord=2)
            if sum_norm > 0:
                s /= sum_norm


        # mute (added by DmBorisov) 
        if PAR.MUTE:
            vel = PAR.MUTESLOPE
            off = PAR.T0_TOP
            inn = PAR.MUTEINNER
            mute_out = PAR.MUTEOUTER
            s = smute(s, h, vel, off, inn, mute_out, constant_spacing=False)
            vel = PAR.MUTESLOPE_BTM
            off = PAR.T0_BOT
            s = smutelow(s, h, vel, off, inn, mute_out, constant_spacing=False)

        return s