我们从Python开源项目中,提取了以下37个代码示例,用于说明如何使用scipy.signal.fftconvolve()。
def freq_from_autocorr(sig, fs): """ Estimate frequency using autocorrelation """ # Calculate autocorrelation (same thing as convolution, but with # one input reversed in time), and throw away the negative lags corr = fftconvolve(sig, sig[::-1], mode='full') corr = corr[len(corr)//2:] # Find the first low point d = diff(corr) start = find(d > 0)[0] # Find the next peak after the low point (other than 0 lag). This bit is # not reliable for long signals, due to the desired peak occurring between # samples, and other peaks appearing higher. # Should use a weighting function to de-emphasize the peaks at longer lags. peak = argmax(corr[start:]) + start px, py = parabolic(corr, peak) return fs / px
def sincinterp(x): """ Sinc interpolation for computation of fractional transformations. As appears in : -https://github.com/audiolabs/frft/ ---------- Args: f : (array) Complex valued input array a : (float) Alpha factor Returns: ret : (array) Real valued synthesised data """ N = len(x) y = np.zeros(2 * N - 1, dtype=x.dtype) y[:2 * N:2] = x xint = fftconvolve( y[:2 * N], np.sinc(np.arange(-(2 * N - 3), (2 * N - 2)).T / 2),) return xint[2 * N - 3: -2 * N + 3]
def direct(self, sigin): """ apply this filter to a signal sigin : input signal (ndarray) returns the filtered signal (ndarray) """ fftconvolve = signal.fftconvolve filtered = fftconvolve(sigin.ravel(), self.fir, 'same') if self.extract_complex: filtered_imag = fftconvolve(sigin.ravel(), self.fir_imag, 'same') if sigin.ndim == 2: filtered = filtered[None, :] if self.extract_complex: filtered_imag = filtered_imag[None, :] if self.extract_complex: return filtered, filtered_imag else: return filtered
def transform(self, sigin): """Apply this filter to a signal Parameters ---------- sigin : array, shape (n_points, ) or (n_signals, n_points) Input signal Returns ------- filtered : array, shape (n_points, ) or (n_signals, n_points) Filtered signal """ sigin_ndim = sigin.ndim sigin = np.atleast_2d(sigin) filtered = [signal.fftconvolve(sig, self.fir, 'same') for sig in sigin] if sigin_ndim == 1: filtered = filtered[0] else: filtered = np.asarray(filtered) return filtered
def cell_fd_conv(cell_df, h144=None): Limg, Lx, Ly = cell_fd_info(cell_df) if h144 is None: h144 = get_h2d(Lx, Ly, l=405, z=0.5, dx=2.2/4, dy=2.2/4) cell_img_fd_l = [] for l in range(Limg): cell_img = cell_df[cell_df["ID"] == l]["image"].values.reshape(Lx, Ly) #cell_img_fd = fd_conv(cell_img, h144) cell_img_fd = fftconvolve(cell_img, h144, mode='same') cell_img_fd_l.append(cell_img_fd) cell_img_fd_a = np.array(cell_img_fd_l) #print( cell_img_fd_a.shape) return cell_img_fd_a
def mask_init(self): x = np.linspace(-10,10,21) X, Y = np.meshgrid(x,x) R = X**2 + Y**2 O = np.exp(-R/2/(4**2)) OO = O/np.sum(O) D = sg.fftconvolve(1.0*self.image.mask.data+0.0, OO,'same') # D = pyfftw.interfaces.scipy_fftpack.convolve(1.0*self.image.mask.data+0.0, OO,'same') self.target = copy.deepcopy(self.image.mask.data) self.maskdata = 0.99*D + 0.01 AA = 2*self.maskdata - 1 AA = np.complex64(AA) BB = np.arccos(AA) self.masktheta = BB.real self.image.mask.data = self.maskdata
def simulate(self, p, tindex=None, dt=1): """Simulates the head contribution. Parameters ---------- p: 1D array Parameters used for simulation. tindex: pandas.Series, optional Time indices to simulate the model. Returns ------- pandas.Series The simulated head contribution. """ b = self.rfunc.block(p, dt) stress = self.stress[0] self.npoints = stress.index.size h = pd.Series(fftconvolve(stress, b, 'full')[:self.npoints], index=stress.index, name=self.name) if tindex is not None: h = h[tindex] return h
def blur_image(self, save=False, show=False): if self.part is None: psf = self.PSFs else: psf = [self.PSFs[self.part]] yN, xN, channel = self.shape key, kex = self.PSFs[0].shape delta = yN - key assert delta >= 0, 'resolution of image should be higher than kernel' result=[] if len(psf) > 1: for p in psf: tmp = np.pad(p, delta // 2, 'constant') cv2.normalize(tmp, tmp, alpha=0, beta=1, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_32F) # blured = np.zeros(self.shape) blured = cv2.normalize(self.original, self.original, alpha=0, beta=1, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_32F) blured[:, :, 0] = np.array(signal.fftconvolve(blured[:, :, 0], tmp, 'same')) blured[:, :, 1] = np.array(signal.fftconvolve(blured[:, :, 1], tmp, 'same')) blured[:, :, 2] = np.array(signal.fftconvolve(blured[:, :, 2], tmp, 'same')) blured = cv2.normalize(blured, blured, alpha=0, beta=1, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_32F) blured = cv2.cvtColor(blured, cv2.COLOR_RGB2BGR) result.append(np.abs(blured)) else: psf = psf[0] tmp = np.pad(psf, delta // 2, 'constant') cv2.normalize(tmp, tmp, alpha=0, beta=1, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_32F) blured = cv2.normalize(self.original, self.original, alpha=0, beta=1, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_32F) blured[:, :, 0] = np.array(signal.fftconvolve(blured[:, :, 0], tmp, 'same')) blured[:, :, 1] = np.array(signal.fftconvolve(blured[:, :, 1], tmp, 'same')) blured[:, :, 2] = np.array(signal.fftconvolve(blured[:, :, 2], tmp, 'same')) blured = cv2.normalize(blured, blured, alpha=0, beta=1, norm_type=cv2.NORM_MINMAX, dtype=cv2.CV_32F) blured = cv2.cvtColor(blured, cv2.COLOR_RGB2BGR) result.append(np.abs(blured)) self.result = result if show or save: self.__plot_canvas(show, save)
def convolve(self, win): " Convolves each channel with window win." from scipy.signal import fftconvolve def conv_func(x): return np.column_stack([fftconvolve(x[:, i], win) for i in range(x.shape[1])]) return self.new_stream(self.vector_map(conv_func))
def test_convolve(): win = [.1, 0, 3] for data in (data2, data3, data4): x = np.column_stack([fftconvolve(data[:, i], win) for i in range(data.shape[1])]) y = Stream(data, sr=1).convolve(win).call() assert eq(x, y)
def _cross_corr(img1, img2=None): ''' Compute the cross correlation of one (or two) images. Parameters ---------- img1 : np.ndarray the image or curve to cross correlate img2 : 1d or 2d np.ndarray, optional If set, cross correlate img1 against img2. A shift of img2 to the right of img1 will lead to a shift of the point of highest correlation to the right. Default is set to None ''' ndim = img1.ndim if img2 is None: img2 = img1 if img1.shape != img2.shape: errorstr = "Image shapes don't match. " errorstr += "(img1 : {},{}; img2 : {},{})"\ .format(*img1.shape, *img2.shape) raise ValueError(errorstr) # need to reverse indices for second image # fftconvolve(A,B) = FFT^(-1)(FFT(A)*FFT(B)) # but need FFT^(-1)(FFT(A(x))*conj(FFT(B(x)))) = FFT^(-1)(A(x)*B(-x)) reverse_index = [slice(None, None, -1) for i in range(ndim)] imgc = fftconvolve(img1, img2[reverse_index], mode='same') return imgc
def smoothLine(data,kernel): """helper function parallelized by smooth""" return signal.fftconvolve(data,kernel, mode='same')
def place_text(self, text_arrs, back_arr, bbs): areas = [-np.prod(ta.shape) for ta in text_arrs] order = np.argsort(areas) locs = [None for i in range(len(text_arrs))] out_arr = np.zeros_like(back_arr) for i in order: ba = np.clip(back_arr.copy().astype(np.float), 0, 255) ta = np.clip(text_arrs[i].copy().astype(np.float), 0, 255) ba[ba > 127] = 1e8 intersect = ssig.fftconvolve(ba,ta[::-1,::-1],mode='valid') safemask = intersect < 1e8 if not np.any(safemask): # no collision-free position: #warn("COLLISION!!!") return back_arr,locs[:i],bbs[:i],order[:i] minloc = np.transpose(np.nonzero(safemask)) loc = minloc[np.random.choice(minloc.shape[0]),:] locs[i] = loc # update the bounding-boxes: bbs[i] = move_bb(bbs[i],loc[::-1]) # blit the text onto the canvas w,h = text_arrs[i].shape out_arr[loc[0]:loc[0]+w,loc[1]:loc[1]+h] += text_arrs[i] return out_arr, locs, bbs, order
def _SSIMForMultiScale(img1, img2, max_val=255, filter_size=11, filter_sigma=1.5, k1=0.01, k2=0.03): img1 = img1.astype(np.float64) img2 = img2.astype(np.float64) _, height, width, _ = img1.shape size = min(filter_size, height, width) sigma = size * filter_sigma / filter_size if filter_size else 0 if filter_size: window = np.reshape(_FSpecialGauss(size, sigma), (1, size, size, 1)) mu1 = signal.fftconvolve(img1, window, mode='valid') mu2 = signal.fftconvolve(img2, window, mode='valid') sigma11 = signal.fftconvolve(img1 * img1, window, mode='valid') sigma22 = signal.fftconvolve(img2 * img2, window, mode='valid') sigma12 = signal.fftconvolve(img1 * img2, window, mode='valid') else: mu1, mu2 = img1, img2 sigma11 = img1 * img1 sigma22 = img2 * img2 sigma12 = img1 * img2 mu11 = mu1 * mu1 mu22 = mu2 * mu2 mu12 = mu1 * mu2 sigma11 -= mu11 sigma22 -= mu22 sigma12 -= mu12 c1 = (k1 * max_val) ** 2 c2 = (k2 * max_val) ** 2 v1 = 2.0 * sigma12 + c2 v2 = sigma11 + sigma22 + c2 ssim = np.mean((((2.0 * mu12 + c1) * v1) / ((mu11 + mu22 + c1) * v2))) cs = np.mean(v1 / v2) return ssim, cs
def _denoise1d(self, M, window_size, order, deriv, rate): try: window_size = np.abs(np.int(window_size)) order = np.abs(np.int(order)) except ValueError as msg: raise ValueError("in SavitzkyGolay.denoise_spectra(), window_size and order have to be of type int") if window_size % 2 != 1 or window_size < 1: raise TypeError("in SavitzkyGolay.denoise_spectra(), window_size size must be a positive odd number") if window_size < order + 2: raise TypeError("in SavitzkyGolay.denoise_spectra(), window_size is too small for the polynomials order") order_range = range(order+1) half_window = (window_size -1) // 2 # precompute coefficients b = np.mat([[k**i for i in order_range] for k in range(-half_window, half_window+1)]) m = np.linalg.pinv(b).A[deriv] * rate**deriv * factorial(deriv) # pad the signal at the extremes with # values taken from the signal itself N, p = M.shape dn = np.ones((N,p), dtype=np.float) long_signal = np.ndarray(p+2, dtype=np.float) for i in range(N): y = M[i] firstvals = y[0] - np.abs( y[1:half_window+1][::-1] - y[0] ) lastvals = y[-1] + np.abs(y[-half_window-1:-1][::-1] - y[-1]) long_signal = np.concatenate((firstvals, y, lastvals)) dn[i] = fftconvolve(long_signal, m, mode='valid') return dn
def _fftconvolve(image, blur_width, blur_height): kernel_width = 10 * int(_np.round(blur_width)) + 1 kernel_height = 10 * int(_np.round(blur_height)) + 1 kernel_y = _signal.gaussian(kernel_height, blur_height) kernel_x = _signal.gaussian(kernel_width, blur_width) kernel = _np.outer(kernel_y, kernel_x) kernel /= kernel.sum() return _signal.fftconvolve(image, kernel, mode='same')
def generate_signal(n, p, loops, SNR_dB=100, noise='white', h=None): # First generate a random signal if noise == 'pink': x = noise_pink(n, rows=loops, alpha=1e-10) elif noise == 'ar1': x = noise_ar1(n, rows=loops) else: x = noise_white(n, rows=loops) # Generate random filters on the sphere if h is None: h = np.random.randn(loops,p) norm = np.linalg.norm(h, axis=1) h = (h.T/norm).T if h.ndim == 1: if h.shape[0] >= p: h = np.tile(h[:p], (loops,1)) else: h2 = np.zeros(loops,p) for i in xrange(loops): h2[i,:h.shape[0]] = h h = h2 # Finally generate the filtered signal sigma_noise = 10.**(-SNR_dB/20.) d = np.zeros((loops,n+h.shape[1]-1)) for l in xrange(loops): d[l,:] = fftconvolve(x[l], h[l]) d[l,:] += np.random.randn(n+h.shape[1]-1)*sigma_noise return x, h, d
def noise_ar1(n, rows=1, a1=0.9): x = noise_white(n, rows=rows) for row in x: row[:] = fftconvolve(row, np.array([1., a1]), mode='same') x = (x.T/np.sqrt(np.mean(x**2, axis=1))).T return x
def _remove_far_masked_data(self, mask, list_signals): """Remove unnecessary data which is masked and far (> self.ordar) from the unmasked data. """ if mask is None: return list_signals selection = ~mask # convolution with a delay kernel, # so we keep the close points before the selection kernel = np.ones(self.ordar * 2 + 1) kernel[-self.ordar:] = 0. delayed_selection = fftconvolve(selection, kernel[None, :], mode='same') # remove numerical error from fftconvolve delayed_selection[np.abs(delayed_selection) < 1e-13] = 0. time_selection = delayed_selection.sum(axis=0) != 0 epoch_selection = delayed_selection.sum(axis=1) != 0 if not np.any(time_selection) or not np.any(epoch_selection): raise ValueError("The mask seems to hide everything.") output_signals = [] for sig in list_signals: if sig is not None: sig = sig[..., epoch_selection, :] sig = sig[..., :, time_selection] output_signals.append(sig) return output_signals
def inverse(self, sigin): """Apply the inverse ARMA filter to a signal sigin : input signal (ndarray) returns the filtered signal(ndarray) """ arpart = np.concatenate((np.ones(1), self.AR_)) return signal.fftconvolve(sigin, arpart, 'same')
def fd_conv(Img_xy, h2d, mode ='same'): #return convolve2d(Img_xy, h2d, mode=mode) return fftconvolve(Img_xy, h2d, mode=mode)
def simulate(self, p, tindex=None, dt=1): """Simulates the head contribution. Parameters ---------- p: 1D array Parameters used for simulation. tindex: pandas.Series, optional Time indices to simulate the model. Returns ------- pandas.Series The simulated head contribution. """ b = self.rfunc.block(p[:-1], dt) self.npoints = self.stress[0].index.size stress = self.get_stress(p=p) h = pd.Series(fftconvolve(stress, b, 'full')[:self.npoints], index=self.stress[0].index, name=self.name) if tindex is not None: h = h[tindex] # see whether it makes a difference to subtract gain * mean_stress # h -= self.rfunc.gain(p) * stress.mean() return h
def simulate(self, p, tindex=None, dt=1): dt = int(dt) b = self.rfunc.block(p[:-self.recharge.nparam], dt) # Block response # The recharge calculation needs arrays precip_array = np.array(self.stress["prec"]) evap_array = np.array(self.stress["evap"]) rseries = self.recharge.simulate(precip_array, evap_array, p[-self.recharge.nparam:]) self.npoints = len(rseries) h = pd.Series(fftconvolve(rseries, b, 'full')[:self.npoints], index=self.stress["prec"].index, name=self.name) if tindex is not None: h = h[tindex] return h
def simulate(self, p=None, tindex=None, dt=1): h = pd.Series(data=0, index=self.stress[0].index, name=self.name) for i in self.stress: self.npoints = self.stress.index.size b = self.rfunc.block(p, self.r[i]) # nparam-1 depending on rfunc h += fftconvolve(self.stress[i], b, 'full')[:self.npoints] if tindex is not None: h = h[tindex] return h
def freq_from_autocorr(signal, sampling_rate): corr = fftconvolve(signal, signal[::-1], mode='full') corr = corr[len(corr)//2:] d = np.diff(corr) start = find_index_by_true(d > 0)[0] peak = np.argmax(corr[start:]) + start px, py = parabolic(corr, peak) return sampling_rate / px
def write_preview_wav(wav_fp, note_beats_and_abs_times, wav_fs=11025.0): wav_len = int(wav_fs * (note_beats_and_abs_times[-1][1] + 0.05)) dt = 1.0 / wav_fs note_type_to_idx = {} idx = 0 for _, beat, time, note_type in note_beats_and_abs_times: if note_type == '0' * len(note_type): continue if note_type not in note_type_to_idx: note_type_to_idx[note_type] = idx idx += 1 num_note_types = len(note_type_to_idx) pulse_f = np.zeros((num_note_types, wav_len)) for _, beat, time, note_type in note_beats_and_abs_times: sample = int(time * wav_fs) if sample > 0 and sample < wav_len and note_type in note_type_to_idx: pulse_f[note_type_to_idx[note_type]][sample] = 1.0 scale = [440.0, 587.33, 659.25, 783.99] freqs = [scale[i % 4] * math.pow(2.0, (i // 4) - 1) for i in xrange(num_note_types)] metro_f = np.zeros(wav_len) for idx in xrange(num_note_types): click_len = 0.05 click_t = np.arange(0.0, click_len, dt) click_atk = 0.02 click_sus = 0.5 click_rel = 0.2 click_env = _linterp(0.0, [(click_atk, 1.0), (click_sus, 1.0), (click_rel, 0.0)], len(click_t)) click_f = click_env * np.sin(2.0 * np.pi * freqs[idx] * click_t) metro_f += fftconvolve(pulse_f[idx], click_f, mode='full')[:wav_len] #metro_f += pulse_f[idx][:wav_len] _wav_write(wav_fp, wav_fs, metro_f, normalize=True)
def convolve(A, B, FFT=None): """ Convolve two arrrays A & B row-wise. One or both can be one-dimensional for SIMO/SISO convolution Parameters ---------- A, B: array_like Data to perform the convolution on of shape [Nsignals x NSamples] FFT: bool, optional Selects wether time or frequency domain convolution is applied. Default: On if Nsamples > 500 for both Returns ------- out: array Array containing row-wise, linear convolution of A and B """ A = _np.atleast_2d(A) B = _np.atleast_2d(B) N_sigA, L_sigA = A.shape N_sigB, L_sigB = B.shape if FFT is None and (L_sigA > 500 and L_sigB > 500): FFT = True else: FFT = False if (N_sigA != N_sigB) and not (N_sigA == 1 or N_sigB == 1): raise ValueError('Number of rows must either match or at least one must be one-dimensional.') if N_sigA == 1 and N_sigB != 1: A = _np.broadcast_to(A, (N_sigB, L_sigA)) elif N_sigA != 1 and N_sigB == 1: B = _np.broadcast_to(B, (N_sigA, L_sigB)) out = [] for IDX, cur_row in enumerate(A): if FFT: out.append(fftconvolve(cur_row, B[IDX])) else: out.append(_np.convolve(cur_row, B[IDX])) return _np.array(out)
def _project(reference_sources, estimated_source, flen): """Least-squares projection of estimated source on the subspace spanned by delayed versions of reference sources, with delays between 0 and flen-1 """ nsrc = reference_sources.shape[0] nsampl = reference_sources.shape[1] # computing coefficients of least squares problem via FFT ## # zero padding and FFT of input data reference_sources = np.hstack((reference_sources, np.zeros((nsrc, flen - 1)))) estimated_source = np.hstack((estimated_source, np.zeros(flen - 1))) n_fft = int(2**np.ceil(np.log2(nsampl + flen - 1.))) sf = scipy.fftpack.fft(reference_sources, n=n_fft, axis=1) sef = scipy.fftpack.fft(estimated_source, n=n_fft) # inner products between delayed versions of reference_sources G = np.zeros((nsrc * flen, nsrc * flen)) for i in range(nsrc): for j in range(nsrc): ssf = sf[i] * np.conj(sf[j]) ssf = np.real(scipy.fftpack.ifft(ssf)) ss = toeplitz(np.hstack((ssf[0], ssf[-1:-flen:-1])), r=ssf[:flen]) G[i * flen: (i+1) * flen, j * flen: (j+1) * flen] = ss G[j * flen: (j+1) * flen, i * flen: (i+1) * flen] = ss.T # inner products between estimated_source and delayed versions of # reference_sources D = np.zeros(nsrc * flen) for i in range(nsrc): ssef = sf[i] * np.conj(sef) ssef = np.real(scipy.fftpack.ifft(ssef)) D[i * flen: (i+1) * flen] = np.hstack((ssef[0], ssef[-1:-flen:-1])) # Computing projection # Distortion filters try: C = np.linalg.solve(G, D).reshape(flen, nsrc, order='F') except np.linalg.linalg.LinAlgError: C = np.linalg.lstsq(G, D)[0].reshape(flen, nsrc, order='F') # Filtering sproj = np.zeros(nsampl + flen - 1) for i in range(nsrc): sproj += fftconvolve(C[:, i], reference_sources[i])[:nsampl + flen - 1] return sproj
def MacroBroad(data, vmacro, extend=True): """ This broadens the data by a given macroturbulent velocity. It works for small wavelength ranges. I need to make a better version that is accurate for large wavelength ranges! Sorry for the terrible variable names, it was copied from convol.pro in AnalyseBstar (Karolien Lefever) Parameters: =========== -data: kglib.utils.DataStructures.xypoint instance Stores the data to be broadened. The data MUST be equally-spaced before calling this! -vmacro: float The macroturbulent velocity, in km/s -extend: boolean If true, the y-axis will be extended to avoid edge-effects Returns: ======== A broadened version of data. """ # Make the kernel c = constants.c.cgs.value * units.cm.to(units.km) sq_pi = np.sqrt(np.pi) lambda0 = np.median(data.x) xspacing = data.x[1] - data.x[0] mr = vmacro * lambda0 / c ccr = 2 / (sq_pi * mr) px = np.arange(-data.size() / 2, data.size() / 2 + 1) * xspacing pxmr = abs(px) / mr profile = ccr * (np.exp(-pxmr ** 2) + sq_pi * pxmr * (erf(pxmr) - 1.0)) # Extend the xy axes to avoid edge-effects, if desired if extend: before = data.y[-profile.size / 2 + 1:] after = data.y[:profile.size / 2] extended = np.r_[before, data.y, after] first = data.x[0] - float(int(profile.size / 2.0 + 0.5)) * xspacing last = data.x[-1] + float(int(profile.size / 2.0 + 0.5)) * xspacing x2 = np.linspace(first, last, extended.size) conv_mode = "valid" else: extended = data.y.copy() x2 = data.x.copy() conv_mode = "same" # Do the convolution newdata = data.copy() newdata.y = fftconvolve(extended, profile / profile.sum(), mode=conv_mode) return newdata
def _compute_smoothed_histogram(values, bandwidth, coord_range, logtrans=False): """Approximate 1-D density estimation. Estimate 1-D probability densities at evenly-spaced grid points, for specified data. This method is based on creating a 1-D histogram of data points quantised with respect to evenly-spaced grid points. Probability densities are then estimated at the grid points by convolving the obtained histogram with a Gaussian kernel. Parameters ---------- values : np.array (N,) A vector containing the data for which to perform density estimation. Successive data points are indexed by the first axis in the array. bandwidth : float The desired KDE bandwidth. (When log-transformation of data is desired, bandwidth should be specified in log-space.) coord_range: (2,) Minimum and maximum values of coordinate on which to evaluate the smoothed histogram. logtrans : boolean Whether or not to log-transform the data before performing density estimation. Returns ------- np.array (M-1,) An array of estimated probability densities at specified grid points. """ if logtrans: ber = [np.log10(extreme) for extreme in coord_range] bin_edges = np.logspace(*ber, num=DENSITY_N + 1) bin_edge_range = ber[1] - ber[0] else: bin_edges = np.linspace(*coord_range, num=DENSITY_N + 1) bin_edge_range = coord_range[1] - coord_range[0] if values.size < 2: # Return zeros if there are too few points to do anything useful. return bin_edges[:-1], np.zeros(bin_edges.shape[0] - 1) # Bin the values H = np.histogram(values, bin_edges)[0] relative_bw = bandwidth / bin_edge_range K = _compute_gaussian_kernel(H.shape, relative_bw) pdf = signal.fftconvolve(H, K, mode='same') # Return lower edges of bins and normalized pdf return bin_edges[:-1], pdf / np.trapz(pdf, bin_edges[:-1])
def _compute_smoothed_histogram2d(values, bandwidth, coord_ranges, logtrans=False): """Approximate 2-D density estimation. Estimate 2-D probability densities at evenly-spaced grid points, for specified data. This method is based on creating a 2-D histogram of data points quantised with respect to evenly-spaced grid points. Probability densities are then estimated at the grid points by convolving the obtained histogram with a Gaussian kernel. Parameters ---------- values : np.array (N,2) A 2-D array containing the data for which to perform density estimation. Successive data points are indexed by the first axis in the array. The second axis indexes x and y coordinates of data points (values[:,0] and values[:,1] respectively). bandwidth : array-like (2,) The desired KDE bandwidths for x and y axes. (When log-transformation of data is desired, bandwidths should be specified in log-space.) coord_range: (2,2) Minimum and maximum values of coordinates on which to evaluate the smoothed histogram. logtrans : array-like (2,) A 2-element boolean array specifying whether or not to log-transform the x or y coordinates of the data before performing density estimation. Returns ------- np.array (M-1, M-1) An array of estimated probability densities at specified grid points. """ bin_edges = [] bedge_range = [] for minmax, lt in zip(coord_ranges, logtrans): if lt: ber = [np.log10(extreme) for extreme in minmax] bin_edges.append(np.logspace(*ber, num=DENSITY_N + 1)) bedge_range.append(ber[1] - ber[0]) else: bin_edges.append(np.linspace(*minmax, num=DENSITY_N + 1)) bedge_range.append(minmax[1] - minmax[0]) # Bin the observations H = np.histogram2d(values[:, 0], values[:, 1], bins=bin_edges)[0] relative_bw = [bw / berange for bw, berange in zip(bandwidth, bedge_range)] K = _compute_gaussian_kernel(H.shape, relative_bw) pdf = signal.fftconvolve(H.T, K, mode='same') # Normalize pdf bin_centers = [edges[:-1] + np.diff(edges) / 2. for edges in bin_edges] pdf /= np.trapz(np.trapz(pdf, bin_centers[1]), bin_centers[0]) # Return lower bin edges and density return bin_edges[0][:-1], bin_edges[1][:-1], pdf
def conv(data, kernel, mode='full', method='fft', use_jit=True): """Convoles data with a kernel using either FFT or sparse convolution This function convolves data with a kernel, relying either on the fast Fourier transform (FFT) or a sparse convolution function. Parameters ---------- data : array_like First input, typically the data array kernel : array_like Second input, typically the kernel mode : str {'full', 'valid', 'same'}, optional, default: 'full' A string indicating the size of the output: - ``full``: The output is the full discrete linear convolution of the inputs. - ``valid``: The output consists only of those elements that do not rely on zero-padding. - ``same``: The output is the same size as `data`, centered with respect to the 'full' output. method : str {'fft', 'sparse'}, optional, default: 'fft' A string indicating the convolution method: - ``fft``: Use the fast Fourier transform (FFT). - ``sparse``: Use the sparse convolution. use_jit : bool, optional, default: True A flag indicating whether to use Numba's just-in-time compilation option (only relevant for `method`=='sparse'). """ if method.lower() == 'fft': # Use FFT: faster on non-sparse data conved = sps.fftconvolve(data, kernel, mode) elif method.lower() == 'sparse': # Use sparseconv: faster on sparse data conved = sparseconv(data, kernel, mode, use_jit) else: raise ValueError("Acceptable methods are: 'fft', 'sparse'.") return conved
def gen_speech_at_mic_stft(phi_ks, source_signals, mic_array_coord, noise_power, fs, fft_size=1024): """ generate microphone signals with short time Fourier transform :param phi_ks: azimuth of the acoustic sources :param source_signals: speech signals for each arrival angle, one per row :param mic_array_coord: x and y coordinates of the microphone array :param noise_power: the variance of the microphone noise signal :param fs: sampling frequency :param fft_size: number of FFT bins :return: y_hat_stft: received (complex) signal at microphones y_hat_stft_noiseless: the noiseless received (complex) signal at microphones """ frame_shift_step = np.int(fft_size / 1.) # half block overlap for adjacent frames K = source_signals.shape[0] # number of point sources num_mic = mic_array_coord.shape[1] # number of microphones # Generate the impulse responses for the array and source directions impulse_response = gen_far_field_ir(np.reshape(phi_ks, (1, -1), order='F'), mic_array_coord, fs) # Now generate all the microphone signals y = np.zeros((num_mic, source_signals.shape[1] + impulse_response.shape[2] - 1), dtype=np.float32) for src in xrange(K): for mic in xrange(num_mic): y[mic] += fftconvolve(impulse_response[src, mic], source_signals[src]) # Now do the short time Fourier transform # The resulting signal is M x fft_size/2+1 x number of frames y_hat_stft_noiseless = \ np.array([pra.stft(signal, fft_size, frame_shift_step, transform=mkl_fft.rfft).T for signal in y]) / np.sqrt(fft_size) # Add noise to the signals y_noisy = y + np.sqrt(noise_power) * np.array(np.random.randn(*y.shape), dtype=np.float32) # compute sources stft source_stft = \ np.array([pra.stft(s_loop, fft_size, frame_shift_step, transform=mkl_fft.rfft).T for s_loop in source_signals]) / np.sqrt(fft_size) y_hat_stft = \ np.array([pra.stft(signal, fft_size, frame_shift_step, transform=mkl_fft.rfft).T for signal in y_noisy]) / np.sqrt(fft_size) return y_hat_stft, y_hat_stft_noiseless, source_stft
def gen_sig_at_mic_stft(phi_ks, alpha_ks, mic_array_coord, SNR, fs, fft_size=1024, Ns=256): """ generate microphone signals with short time Fourier transform :param phi_ks: azimuth of the acoustic sources :param alpha_ks: power of the sources :param mic_array_coord: x and y coordinates of the microphone array :param SNR: signal to noise ratio at the microphone :param fs: sampling frequency :param fft_size: number of FFT bins :param Ns: number of time snapshots used to estimate covariance matrix :return: y_hat_stft: received (complex) signal at microphones y_hat_stft_noiseless: the noiseless received (complex) signal at microphones """ frame_shift_step = np.int(fft_size / 1.) # half block overlap for adjacent frames K = alpha_ks.shape[0] # number of point sources num_mic = mic_array_coord.shape[1] # number of microphones # Generate the impulse responses for the array and source directions impulse_response = gen_far_field_ir(np.reshape(phi_ks, (1, -1), order='F'), mic_array_coord, fs) # Now generate some noise # source_signal = np.random.randn(K, Ns * fft_size) * np.sqrt(alpha_ks[:, np.newaxis]) source_signal = np.random.randn(K, fft_size + (Ns - 1) * frame_shift_step) * \ np.sqrt(np.reshape(alpha_ks, (-1, 1), order='F')) # Now generate all the microphone signals y = np.zeros((num_mic, source_signal.shape[1] + impulse_response.shape[2] - 1), dtype=np.float32) for src in xrange(K): for mic in xrange(num_mic): y[mic] += fftconvolve(impulse_response[src, mic], source_signal[src]) # Now do the short time Fourier transform # The resulting signal is M x fft_size/2+1 x number of frames y_hat_stft_noiseless = \ np.array([pra.stft(signal, fft_size, frame_shift_step, transform=mkl_fft.rfft).T for signal in y]) / np.sqrt(fft_size) # compute noise variance based on SNR signal_energy = linalg.norm(y_hat_stft_noiseless.flatten()) ** 2 noise_energy = signal_energy / 10 ** (SNR * 0.1) sigma2_noise = noise_energy / y_hat_stft_noiseless.size # Add noise to the signals y_noisy = y + np.sqrt(sigma2_noise) * np.array(np.random.randn(*y.shape), dtype=np.float32) y_hat_stft = \ np.array([pra.stft(signal, fft_size, frame_shift_step, transform=mkl_fft.rfft).T for signal in y_noisy]) / np.sqrt(fft_size) return y_hat_stft, y_hat_stft_noiseless