Python scipy.signal 模块,hilbert() 实例源码

我们从Python开源项目中,提取了以下33个代码示例,用于说明如何使用scipy.signal.hilbert()

项目:DTW_physionet2016    作者:JJGO    | 项目源码 | 文件源码
def homomorphic_envelope(x, fs=1000, f_LPF=8, order=3):
    """
    Computes the homomorphic envelope of x

    Args:
        x : array
        fs : float
            Sampling frequency. Defaults to 1000 Hz
        f_LPF : float
            Lowpass frequency, has to be f_LPF < fs/2. Defaults to 8 Hz
    Returns:
        time : numpy array
    """
    b, a = butter(order, 2 * f_LPF / fs, 'low')
    he = np.exp(filtfilt(b, a, np.log(np.abs(hilbert(x)))))
    return he
项目:untwist    作者:IoSR-Surrey    | 项目源码 | 文件源码
def process(self, wave):
        wave.check_mono()
        if wave.sample_rate != self.sr:
            raise Exception("Wrong sample rate")                              
        n = int(np.ceil(2 * wave.num_frames / float(self.w_len)))
        m = (n + 1) * self.w_len / 2 
        swindow = self.make_signal_window(n)
        win_ratios = [self.window / swindow[t * self.w_len / 2 : 
            t * self.w_len / 2 + self.w_len] 
            for t in range(n)]
        wave = wave.zero_pad(0, int(m - wave.num_frames))
        wave = audio.Wave(signal.hilbert(wave), wave.sample_rate)        
        result = np.zeros((self.n_bins, n))

        for b in range(self.n_bins): 
            w = self.widths[b]
            wc = 1 / np.square(w + 1)
            filter = self.filters[b]
            band = fftfilt(filter, wave.zero_pad(0, int(2 * w))[:,0])
            band = band[int(w) : int(w + m), np.newaxis]    
            for t in range(n):
                frame = band[t * self.w_len / 2:
                             t * self.w_len / 2 + self.w_len,:] * win_ratios[t]
                result[b, t] =  wc * np.real(np.conj(np.dot(frame.conj().T, frame)))
        return audio.Spectrogram(result, self.sr, self.w_len, self.w_len / 2)
项目:speech_feature_extractor    作者:ZhihaoDU    | 项目源码 | 文件源码
def ams_extractor(x, sr, win_len, shift_len, order):
    from scipy.signal import hilbert
    envelope = np.abs(hilbert(x))
    for i in range(order-1):
        envelope = np.abs(hilbert(envelope))
    envelope = envelope * 1./3.
    frames = (len(envelope) - win_len) // shift_len
    hanning_window = np.hanning(win_len)
    ams_feature = np.zeros(shape=(15, frames))
    wts = cal_triangle_window(0, sr//2, win_len, 15, 15.6, 400)
    for i in range(frames):
        one_frame = x[i*shift_len:i*shift_len+win_len]
        one_frame = one_frame * hanning_window
        frame_fft = np.abs(np.fft.fft(one_frame, win_len))
        ams_feature[:,i] = np.matmul(wts, frame_fft)
    return ams_feature
项目:speech_feature_extractor    作者:ZhihaoDU    | 项目源码 | 文件源码
def ams_extractor(x, sr, win_len, shift_len, order=1, decimate_coef=1./4.):
    from scipy.signal import hilbert
    envelope = np.abs(hilbert(x))
    for i in range(order-1):
        envelope = np.abs(hilbert(envelope))
    envelope = envelope * decimate_coef
    frames = (len(envelope) - win_len) // shift_len
    hanning_window = np.hanning(win_len)
    ams_feature = np.zeros(shape=(15, frames))
    wts = cal_triangle_window(0, sr//2, win_len, 15, 15.6, 400)
    for i in range(frames):
        one_frame = x[i*shift_len:i*shift_len+win_len]
        one_frame = one_frame * hanning_window
        frame_fft = np.abs(np.fft.fft(one_frame, win_len))
        ams_feature[:,i] = np.matmul(wts, frame_fft)
    return ams_feature
项目:tensorpac    作者:EtienneCmb    | 项目源码 | 文件源码
def __init__(self, idpac=(1, 1, 3), fpha=[2, 4], famp=[60, 200],
                 dcomplex='hilbert', filt='fir1', cycle=(3, 6), filtorder=3,
                 width=7, nbins=18, nblocks=2):
        """Check and initialize."""
        # Initialize visualization methods :
        PacPlot.__init__(self)
        # ----------------- CHECKING -----------------
        # Pac methods :
        self._idcheck(idpac)
        # Frequency checking :
        self.fpha, self.famp = pac_vec(fpha, famp)
        self.xvec, self.yvec = self.fpha.mean(1), self.famp.mean(1)

        # Check spectral properties :
        self._speccheck(filt, dcomplex, filtorder, cycle, width)

        # ----------------- SELF -----------------
        self.nbins, self.nblocks = int(nbins), int(nblocks)
项目:leeds_seismology_programs    作者:georgetaylor3152    | 项目源码 | 文件源码
def autocorr(trace):
    """This function takes an obspy trace object and performs a phase autocorrelation
    of the trace with itself. First, the hilbert transform is taken to obtain the 
    analytic signal and hence the instantaneous phase. This is then passed to a 
    fortran script where phase correlation is performed after Schimmel et al., 2011.

    This function relies on the shared object file phasecorr.so, which is the file
    containing the fortran subroutine for phase correlation.
    """

    import numpy as np

    from scipy.signal import hilbert
    from phasecorr import phasecorr
    # Hilbert transform to obtain the analytic signal
    htrans = hilbert(trace)
    # Perform phase autocorrelation with instantaneous phase
    pac = phasecorr(np.angle(htrans),np.angle(htrans),len(htrans))

    return pac
项目:leeds_seismology_programs    作者:georgetaylor3152    | 项目源码 | 文件源码
def crosscorr(tr1,tr2):
    """This function takes 2 obspy traces and performs a phase crosscorrelation
    of the traces. First, the hilbert transform is taken to obtain the 
    analytic signal and hence the instantaneous phase. This is then passed to a 
    fortran script where phase correlation is performed after Schimmel et al., 2011.

    This function relies on the shared object file phasecorr.so, which is the file
    containing the fortran subroutine for phase correlation.
    """

    import numpy as np

    from scipy.signal import hilbert
    from phasecorr import phasecorr
    # Hilbert transform to obtain the analytic signal
    htrans1 = hilbert(tr1)
    htrans2 = hilbert(tr2)
    # Perform phase autocorrelation with instantaneous phase
    pcc = phasecorr(np.angle(htrans1),np.angle(htrans2),len(htrans1))

    return pcc
项目:SeisFlows_tunnel    作者:DmBorisov    | 项目源码 | 文件源码
def InstantaneousPhase(syn, obs, nt, dt, eps=0.05):
    # instantaneous phase 
    # (Bozdag et al 2011, eq 27)
    r = _np.real(_analytic(syn))
    i = _np.imag(_analytic(syn))
    phi_syn = _np.arctan2(i,r)

    r = _np.real(_analytic(obs))
    i = _np.imag(_analytic(obs))
    phi_obs = _np.arctan2(i,r)

    phi_rsd = phi_syn - phi_obs
    esyn = abs(_analytic(syn))
    emax = max(esyn**2.)

    wadj = phi_rsd*_np.imag(_analytic(syn))/(esyn**2. + eps*emax) + \
           _np.imag(_analytic(phi_rsd * syn/(esyn**2. + eps*emax)))

    return wadj
项目:SeisFlows_tunnel    作者:DmBorisov    | 项目源码 | 文件源码
def AnalyticSignal(syn, obs, nt, dt, eps=0.):
    esyn = abs(_analytic(syn))
    eobs = abs(_analytic(obs))

    esyn1 = esyn + eps*max(esyn)
    eobs1 = eobs + eps*max(eobs)
    esyn3 = esyn**3 + eps*max(esyn**3)

    diff1 = syn/(esyn1) - obs/(eobs1)
    diff2 = _hilbert(syn)/esyn1 - _hilbert(obs)/eobs1

    part1 = diff1*_hilbert(syn)**2/esyn3 - diff2*syn*_hilbert(syn)/esyn3
    part2 = diff1*syn*_hilbert(syn)/esyn3 - diff2*syn**2/esyn3

    wadj = part1 + _hilbert(part2)
    return wadj
项目:psola    作者:jcreinhold    | 项目源码 | 文件源码
def __hilbert(x):
    """
    Hilbert transform to the power of 2

    Args:
        x (array): numpy array of data

    Returns:
        y (array): numpy array of Hilbert transformed x
                     (same size as x)
    """
    l = x.size
    pad = int(2**(np.floor(np.log2(l)) + 1))
    y = signal.hilbert(x, N=pad)[:l]
    return y
项目:untwist    作者:IoSR-Surrey    | 项目源码 | 文件源码
def process(self, wave, W):
        wave.check_mono()
        if wave.sample_rate != self.sr: raise Exception("Wrong sample rate")
        n = int(np.ceil(2 * wave.num_frames / float(self.w_len)))
        m = (n + 1) * self.w_len / 2
        if n != W.shape[1]: raise Exception("Wrong size for W")
        swindow = self.make_signal_window(n)
        win_ratios = [self.window / swindow[t * self.w_len / 2 :
            t * self.w_len / 2 + self.w_len]
            for t in range(n)]
        wave = wave.zero_pad(0, int((n + 1) * self.w_len / 2.0 - wave.num_frames))
        wave = audio.Wave(signal.hilbert(wave), wave.sample_rate)
        result = np.zeros(wave.shape)

        for b in range(self.n_bins):    
            w = self.widths[b]
            wc = 1/np.square(w + 1)
            filter = 1/w * self.filters[b]
            band = fftfilt(filter, wave.zero_pad(0, int(2 * w))[:,0])
            band = band[int(w) : int(w + (n + 1) * self.w_len / 2), np.newaxis]
            out_band = audio.Wave(np.zeros(band.shape, np.complex128), wave.sample_rate)
            for t in range(n):
                start = int(t * self.w_len / 2)
                end = int(t * self.w_len / 2 + self.w_len)
                frame = band[start:end,:] * win_ratios[t]**2
                out_band[start:end,:] = out_band[start:end,:] + frame * W[b,t]
            out_band = np.real(fftfilt(filter, out_band.zero_pad(0, int(2 * w))[:,0]))
            result[:,0] = result[:,0] + self.weights[b] * out_band[int(w): int(w + m)]
        return result
项目:tensorpac    作者:EtienneCmb    | 项目源码 | 文件源码
def _speccheck(self, filt=None, dcomplex=None, filtorder=None, cycle=None,
                   width=None):
        """Check spectral parameters."""
        # Check the filter name :
        if filt is not None:
            if filt not in ['fir1', 'butter', 'bessel']:
                raise ValueError("filt must either be 'fir1', 'butter' or "
                                 "'bessel'")
            else:
                self._filt = filt
        # Check cycle :
        if cycle is not None:
            cycle = np.asarray(cycle)
            if (len(cycle) is not 2) or not cycle.dtype == int:
                raise ValueError("Cycle must be a tuple of two integers.")
            else:
                self._cycle = cycle
        # Check complex decomposition :
        if dcomplex is not None:
            if dcomplex not in ['hilbert', 'wavelet']:
                raise ValueError("dcomplex must either be 'hilbert' or "
                                 "'wavelet'.")
            else:
                self._dcomplex = dcomplex
        # Convert filtorder :
        if filtorder is not None:
            self._filtorder = int(filtorder)
        # Convert Morlet's width :
        if width is not None:
            self._width = int(width)
项目:brainpipe    作者:EtienneCmb    | 项目源码 | 文件源码
def _getTransform(sf, f, npts, method, wltWidth, *arg):
    """Return a fuction which contain a transformation
    - 'hilbert'
    - 'hilbert1'
    - 'hilbert2'
    - 'wavelet'
    """
    # Get the design of the filter :
    fDesign = _getFiltDesign(sf, f, npts, *arg)

    # Hilbert method
    if method == 'hilbert':
        def hilb(x):
            xH = np.zeros(x.shape)*1j
            xF = fDesign(x)
            for k in range(x.shape[1]):
                xH[:, k] = hilbert(xF[:, k])
            return xH
        return hilb

    # Hilbert method 1
    elif method == 'hilbert1':
        def hilb1(x): return hilbert(fDesign(x))
        return hilb1

    # Hilbert method 2
    elif method == 'hilbert2':
        def hilb2(x): return hilbert2(fDesign(x))
        return hilb2

    # Wavelet method
    elif method == 'wavelet':
        def wav(x): return morlet(x, sf, (f[0]+f[1])/2, wavelet_width=wltWidth)
        return wav

    # Filter the signal
    elif method == 'filter':
        def fm(x): return fDesign(x)
        return fm
项目:arlpy    作者:org-arl    | 项目源码 | 文件源码
def test_envelope(self):
        x = np.random.normal(0, 1, 1000)
        self.assertArrayEqual(signal.envelope(x), np.abs(sp.hilbert(x)))
项目:arlpy    作者:org-arl    | 项目源码 | 文件源码
def envelope(x):
    """Generate a Hilbert envelope of the real signal x."""
    return _np.abs(_sig.hilbert(x, axis=0))
项目:DTW_physionet2016    作者:JJGO    | 项目源码 | 文件源码
def hilbert_transform(x):
    """
    Computes modulus of the complex valued
    hilbert transform of x
    """
    return np.abs(hilbert(x))
项目:pactools    作者:pactools    | 项目源码 | 文件源码
def phase_amplitude(signals, phase=True, amplitude=True):
    """Extract instantaneous phase and amplitude with Hilbert transform"""
    # one dimension array
    if signals.ndim == 1:
        signals = signals[None, :]
        one_dim = True
    elif signals.ndim == 2:
        one_dim = False
    else:
        raise ValueError('Impossible to compute phase_amplitude with ndim ='
                         ' %s.' % (signals.ndim, ))

    n_epochs, n_points = signals.shape
    n_fft = compute_n_fft(signals)

    sig_phase = np.empty(signals.shape) if phase else None
    sig_amplitude = np.empty(signals.shape) if amplitude else None
    for i, sig in enumerate(signals):
        sig_complex = hilbert(sig, n_fft)[:n_points]

        if phase:
            sig_phase[i] = np.angle(sig_complex)
        if amplitude:
            sig_amplitude[i] = np.abs(sig_complex)

    # one dimension array
    if one_dim:
        if phase:
            sig_phase = sig_phase[0]
        if amplitude:
            sig_amplitude = sig_amplitude[0]

    return sig_phase, sig_amplitude
项目:pactools    作者:pactools    | 项目源码 | 文件源码
def crop_for_fast_hilbert(signals):
    """Crop the signal to have a good prime decomposition, for hilbert filter.
    """
    if signals.ndim < 2:
        tmax = signals.shape[0]
        while prime_factors(tmax)[-1] > 20:
            tmax -= 1
        return signals[:tmax]
    else:
        tmax = signals.shape[1]
        while prime_factors(tmax)[-1] > 20:
            tmax -= 1
        return signals[:, :tmax]
项目:decoding_challenge_cortana_2016_3rd    作者:kingjr    | 项目源码 | 文件源码
def _my_hilbert(x, n_fft=None, envelope=False):
    """ Compute Hilbert transform of signals w/ zero padding.

    Parameters
    ----------
    x : array, shape (n_times)
        The signal to convert
    n_fft : int, length > x.shape[-1] | None
        How much to pad the signal before Hilbert transform.
        Note that signal will then be cut back to original length.
    envelope : bool
        Whether to compute amplitude of the hilbert transform in order
        to return the signal envelope.

    Returns
    -------
    out : array, shape (n_times)
        The hilbert transform of the signal, or the envelope.
    """
    from scipy.signal import hilbert
    n_fft = x.shape[-1] if n_fft is None else n_fft
    n_x = x.shape[-1]
    out = hilbert(x, N=n_fft)[:n_x]
    if envelope is True:
        out = np.abs(out)
    return out
项目:decoding_challenge_cortana_2016_3rd    作者:kingjr    | 项目源码 | 文件源码
def _compute_normalized_phase(data):
    """Compute normalized phase angles

    Parameters
    ----------
    data : ndarray, shape (n_epochs, n_sources, n_times)
        The data to compute the phase angles for.

    Returns
    -------
    phase_angles : ndarray, shape (n_epochs, n_sources, n_times)
        The normalized phase angles.
    """
    from scipy.signal import hilbert
    return (np.angle(hilbert(data)) + np.pi) / (2 * np.pi)
项目:GRIPy    作者:giruenf    | 项目源码 | 文件源码
def __init__(self, real_signal, sampling):
        self._fs = 1/sampling
        self._analytic_signal =  hilbert(real_signal)           
        self._amplitude_envelope = np.abs(self._analytic_signal)
        self._instantaneous_phase = np.unwrap(np.angle(self._analytic_signal))
        self._instantaneous_frequency = (np.diff(self._instantaneous_phase) / 
                                        (2.0*np.pi) * self._fs)
        self._instantaneous_frequency = np.insert(self._instantaneous_frequency, 0, np.nan)
项目:SeisFlows_tunnel    作者:DmBorisov    | 项目源码 | 文件源码
def Envelope(syn, obs, nt, dt, eps=0.05):
    # envelope difference
    # (Yuan et al 2015, eq 16)
    esyn = abs(_analytic(syn))
    eobs = abs(_analytic(obs))
    if esyn.all() == 0.0:
        etmp = 0.0
    else:
        etmp = (esyn - eobs)/(esyn + eps*esyn.max())

    wadj = etmp*syn - _np.imag(_analytic(etmp*_np.imag(_analytic(syn))))
    return wadj
项目:SeisFlows_tunnel    作者:DmBorisov    | 项目源码 | 文件源码
def Envelope3(syn, obs, nt, dt, eps=0.):
    # envelope lag
    # (Yuan et al 2015, eqs B-2, B-5)
    esyn = abs(_analytic(syn))
    eobs = abs(_analytic(obs))

    erat = _np.zeros(nt)
    erat[1:-1] = (esyn[2:] - esyn[0:-2])/(2.*dt)
    erat[1:-1] /= esyn[1:-1]
    erat *= misfit.Envelope3(syn, obs, nt, dt)

    wadj = -erat*syn + _hilbert(erat*_hilbert(esyn))
    return wadj
项目:SeisFlows_tunnel    作者:DmBorisov    | 项目源码 | 文件源码
def Envelope(syn, obs, nt, dt, eps=0.05):
    # envelope difference
    # (Yuan et al 2015, eq 9)
    esyn = abs(_analytic(syn))
    eobs = abs(_analytic(obs))
    ersd = esyn-eobs
    return _np.sqrt(_np.sum(ersd*ersd*dt))
项目:SeisFlows_tunnel    作者:DmBorisov    | 项目源码 | 文件源码
def Envelope2(syn, obs, nt, dt, eps=0.):
    # envelope amplitude ratio
    # (Yuan et al 2015, eq B-1)
    esyn = abs(_analytic(syn))
    eobs = abs(_analytic(obs))
    raise NotImplementedError
项目:SeisFlows_tunnel    作者:DmBorisov    | 项目源码 | 文件源码
def Envelope3(syn, obs, nt, dt, eps=0.):
    # envelope cross-correlation lag
    # (Yuan et al 2015, eqs B-4)
    esyn = abs(_analytic(syn))
    eobs = abs(_analytic(obs))
    return Traveltime(esyn, eobs, nt, dt)
项目:SeisFlows_tunnel    作者:DmBorisov    | 项目源码 | 文件源码
def AnalyticSignal(syn, obs, nt, dt, eps=0.):
    esyn = abs(_analytic(syn))
    eobs = abs(_analytic(obs))

    esyn1 = esyn + eps*max(esyn)
    eobs1 = eobs + eps*max(eobs)

    diff = syn/esyn1 - obs/eobs1

    return _np.sqrt(_np.sum(diff*diff*dt))
项目:SeisFlows_tunnel    作者:DmBorisov    | 项目源码 | 文件源码
def hilbert(w):
    return np.imag(analytic(w))
项目:tensorpac    作者:EtienneCmb    | 项目源码 | 文件源码
def spectral(x, sf, f, axis, stype, dcomplex, filt, filtorder, cycle, width,
             njobs):
    """Extract spectral informations from data.

    Parameters
    ----------
    x : array_like
        Array of data

    sf : float
        Sampling frequency

    f : array_like
        Frequency vector of shape (N, 2)

    axis : int
        Axis where the time is located.

    stype : string
        Spectral informations to extract (use either 'pha' or 'amp')

    dcomplex : string
        Complex decomposition type. Use either 'hilbert' or 'wavelet'

    filt : string
        Name of the filter to use (only if dcomplex is 'hilbert'). Use
        either 'eegfilt', 'butter' or 'bessel'.

    filtorder : int
        Order of the filter (only if dcomplex is 'hilbert')

    cycle : int
        Number of cycles to use for fir1 filtering.

    width : int
        Width of the wavelet.

    njobs : int
        Number of jobs to use. If jobs is -1, all of them are going to be
        used.
    """
    # Filtering + complex decomposition :
    if dcomplex is 'hilbert':
        # Filt each time series :
        nf = range(f.shape[0])
        xf = Parallel(n_jobs=njobs)(delayed(filtdata)(
            x, sf, f[k, :], axis, filt, cycle, filtorder) for k in nf)
        # Use hilbert for the complex decomposition :
        xd = hilbert(xf, axis=axis + 1) if stype is not None else np.array(xf)
    elif dcomplex is 'wavelet':
        f = f.mean(1)  # centered frequencies
        xd = Parallel(n_jobs=njobs)(delayed(morlet)(
            x, sf, k, axis, width) for k in f)

    # Extract phase / amplitude :
    if stype is 'pha':
        return np.angle(np.moveaxis(xd, axis + 1, -1))
    elif stype is 'amp':
        return np.abs(np.moveaxis(xd, axis + 1, -1))
    elif stype is None:
        return np.moveaxis(xd, axis + 1, -1)
项目:tensorpac    作者:EtienneCmb    | 项目源码 | 文件源码
def filter(self, sf, x, axis=-1, ftype='phase', keepfilt=False, njobs=-1):
        """Filt the data in the specified frequency bands.

        Parameters
        ----------
        sf: float
            The sampling frequency.

        x: array_like
            Array of data.

        axis : int | -1
            Location of the time axis.

        ftype : {'phase', 'amplitude'}
            Specify if you want to extract phase ('phase') or the amplitude
            ('amplitude').

        njobs : int | -1
            Number of jobs to compute PAC in parallel. For very large data,
            set this parameter to 1 in order to prevent large memory usage.

        keepfilt : bool | False
            Specify if you only want the filtered data (True). This parameter
            is only avaible with dcomplex='hilbert' and not wavelet.

        Returns
        -------
        xfilt : array_like
            The filtered data of shape (n_frequency, ...)
        """
        # Sampling frequency :
        if not isinstance(sf, (int, float)):
            raise ValueError("The sampling frequency must be a float number.")
        else:
            sf = float(sf)
        # Compatibility between keepfilt and wavelet :
        if (keepfilt is True) and (self._dcomplex is 'wavelet'):
            raise ValueError("Using wavelet for the complex decomposition do "
                             "not allow to get filtered data only. Set the "
                             "keepfilt parameter to False or set dcomplex to "
                             "'hilbert'.")
        # 1D signals :
        if x.ndim == 1:
            x = x.reshape(1, -1)
            axis = 1
        # Switch between phase or amplitude :
        if ftype is 'phase':
            tosend = 'pha' if not keepfilt else None
            xfilt = spectral(x, sf, self.fpha, axis, tosend, self._dcomplex,
                             self._filt, self._filtorder, self._cycle[0],
                             self._width, njobs)
        elif ftype is 'amplitude':
            tosend = 'amp' if not keepfilt else None
            xfilt = spectral(x, sf, self.famp, axis, tosend, self._dcomplex,
                             self._filt, self._filtorder, self._cycle[1],
                             self._width, njobs)
        else:
            raise ValueError("ftype must either be None, 'phase' or "
                             "'amplitude.'")
        return xfilt
项目:brainpipe    作者:EtienneCmb    | 项目源码 | 文件源码
def _cfcFiltSuro(xPha, xAmp, surJob, self):
    """SUP: Get the cfc and surrogates

    The function return:
        - The unormalized cfc
        - All the surrogates (for pvalue)
        - The mean of surrogates (for normalization)
        - The deviation of surrogates (for normalization)
    """
    # Check input variables :
    npts, ntrial = xPha.shape
    W = self._window
    nwin = len(W)

    # Get the filter for phase/amplitude properties :
    phaMeth = self._pha.get(self._sf, self._pha.f, self._npts)
    ampMeth = self._amp.get(self._sf, self._amp.f, self._npts)

    # Filt the phase and amplitude :
    xPha = self._pha.apply(xPha, phaMeth)
    xAmp = self._amp.apply(xAmp, ampMeth)

    # Extract phase of amplitude for PLV method:
    if self.Id[0] in ['4']:
        for a in range(xAmp.shape[0]):
            for t in range(xAmp.shape[2]):
                xAmp[a, :, t] = np.angle(hilbert(np.ravel(xAmp[a, :, t])))

    # 2D loop trick :
    claIdx, listWin, listTrial = list2index(nwin, ntrial)

    # Get the unormalized cfc :
    uCfc = [_cfcGet(np.squeeze(xPha[:, W[k[0]][0]:W[k[0]][1], k[1]]),
                    np.squeeze(xAmp[:, W[k[0]][0]:W[k[0]][1], k[1]]),
                    self.Id, self._nbins) for k in claIdx]
    uCfc = np.array(groupInList(uCfc, listWin))

    # Run surogates on each window :
    if (self.n_perm != 0) and (self.Id[0] is not '5') and (self.Id[1] is not '0'):
        Suro = Parallel(n_jobs=surJob)(delayed(_cfcGetSuro)(
            xPha[:, k[0]:k[1], :], xAmp[:, k[0]:k[1], :],
            self.Id, self.n_perm, self._nbins, self._matricial) for k in self._window)
        mSuro = [np.mean(k, 3) for k in Suro]
        stdSuro = [np.std(k, 3) for k in Suro]
    else:
        Suro, mSuro, stdSuro = None, None, None

    return uCfc, Suro, mSuro, stdSuro
项目:nelpy    作者:nelpy    | 项目源码 | 文件源码
def signal_envelope1D(data, *, sigma=None, fs=None):
    """Docstring goes here

    TODO: this is not yet epoch-aware!

    sigma = 0 means no smoothing (default 4 ms)
    """

    if sigma is None:
        sigma = 0.004   # 4 ms standard deviation
    if fs is None:
        if isinstance(data, (np.ndarray, list)):
            raise ValueError("sampling frequency must be specified!")
        elif isinstance(data, core.AnalogSignalArray):
            fs = data.fs

    if isinstance(data, (np.ndarray, list)):
        # Compute number of samples to compute fast FFTs
        padlen = nextfastpower(len(data)) - len(data)
        # Pad data
        paddeddata = np.pad(data, (0, padlen), 'constant')
        # Use hilbert transform to get an envelope
        envelope = np.absolute(hilbert(paddeddata))
        # Truncate results back to original length
        envelope = envelope[:len(data)]
        if sigma:
            # Smooth envelope with a gaussian (sigma = 4 ms default)
            EnvelopeSmoothingSD = sigma*fs
            smoothed_envelope = scipy.ndimage.filters.gaussian_filter1d(envelope, EnvelopeSmoothingSD, mode='constant')
            envelope = smoothed_envelope
    elif isinstance(data, core.AnalogSignalArray):
        newasa = copy.deepcopy(data)
        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            cum_lengths = np.insert(np.cumsum(data.lengths), 0, 0)

        # for segment in data:
        for idx in range(data.n_epochs):
            # print('hilberting epoch {}/{}'.format(idx+1, data.n_epochs))
            segment_data = data._ydata[:,cum_lengths[idx]:cum_lengths[idx+1]]
            n_signals, n_samples = segment_data.shape
            assert n_signals == 1, 'only 1D signals supported!'
            # Compute number of samples to compute fast FFTs:
            padlen = nextfastpower(n_samples) - n_samples
            # Pad data
            paddeddata = np.pad(segment_data.squeeze(), (0, padlen), 'constant')
            # Use hilbert transform to get an envelope
            envelope = np.absolute(hilbert(paddeddata))
            # free up memory
            del paddeddata
            # Truncate results back to original length
            envelope = envelope[:n_samples]
            if sigma:
                # Smooth envelope with a gaussian (sigma = 4 ms default)
                EnvelopeSmoothingSD = sigma*fs
                smoothed_envelope = scipy.ndimage.filters.gaussian_filter1d(envelope, EnvelopeSmoothingSD, mode='constant')
                envelope = smoothed_envelope
            newasa._ydata[:,cum_lengths[idx]:cum_lengths[idx+1]] = np.atleast_2d(envelope)
        return newasa
    return envelope
项目:yam    作者:trichter    | 项目源码 | 文件源码
def time_norm(data, method, clip_factor=1, clip_set_zero=False,
              mute_parts=48, mute_factor=2):
    """
    Calculate normalized data, see e.g. Bensen et al. (2007)

    :param data: numpy array with data to manipulate
    :param str method:
        1bit: reduce data to +1 if >0 and -1 if <0\n
        clip: clip data to the root mean square (rms)\n
        mute_envelope: calculate envelope and set data to zero where envelope
        is larger than specified
    :param float clip_factor: multiply std with this value before cliping
    :param bool clip_mask: instead of clipping, set the values to zero and mask
        them
    :param int mute_parts: mean of the envelope is calculated by dividing the
        envelope into several parts, the mean calculated in each part and
        the median of this averages defines the mean envelope
    :param float mute_factor: mean of envelope multiplied by this
        factor defines the level for muting

    :return: normalized data
    """
    mask = np.ma.getmask(data)
    if method == '1bit':
        data = np.sign(data)
    elif method == 'clip':
        std = np.std(data)
        args = (data < -clip_factor * std, data > clip_factor * std)
        if clip_set_zero:
            ind = np.logical_or(*args)
            data[ind] = 0
        else:
            np.clip(data, *args, out=data)
    elif method == 'mute_envelope':
        N = next_fast_len(len(data))
        envelope = np.abs(hilbert(data, N))[:len(data)]
        levels = [np.mean(d) for d in np.array_split(envelope, mute_parts)]
        level = mute_factor * np.median(levels)
        data[envelope > level] = 0
    else:
        msg = 'The method passed to time_norm is not known: %s.' % method
        raise ValueError(msg)
    return _fill_array(data, mask=mask, fill_value=0.)


# http://azitech.wordpress.com/
# 2011/03/15/designing-a-butterworth-low-pass-filter-with-scipy/