我们从Python开源项目中,提取了以下21个代码示例,用于说明如何使用scipy.signal.correlate()。
def filter_and_smooth_angular_velocity(angular_velocity, low_pass_kernel_size, clip_percentile, plot=False): """Reduce the noise in a velocity signal.""" max_value = np.percentile(angular_velocity, clip_percentile) print("Clipping angular velocity norms to {} rad/s ...".format(max_value)) angular_velocity_clipped = np.clip(angular_velocity, -max_value, max_value) print("Done clipping angular velocity norms...") low_pass_kernel = np.ones((low_pass_kernel_size, 1)) / low_pass_kernel_size print("Smoothing with kernel size {} samples...".format(low_pass_kernel_size)) angular_velocity_smoothed = signal.correlate(angular_velocity_clipped, low_pass_kernel, 'same') print("Done smoothing angular velocity norms...") if plot: plot_angular_velocities("Angular Velocities", angular_velocity, angular_velocity_smoothed, True) return angular_velocity_smoothed.copy()
def x_corr(a,b,center_time_s=1000.0,window_len_s=50.0,plot=True): center_index = int(center_time_s/a.dt) window_index = int(window_len_s/(a.dt)) print "center_index is", center_index print "window_index is", window_index t1 = a.trace_x[(center_index - window_index) : (center_index + window_index)] t2 = b.trace_x[(center_index - window_index) : (center_index + window_index)] print t1 time_window = np.linspace((-window_len_s/2.0), (window_len_s/2), len(t1)) #print time_window #plt.plot(time_window, t1) #plt.plot(time_window, t2) #plt.show() x_corr_time = correlate(t1, t2) delay = (np.argmax(x_corr_time) - (len(x_corr_time)/2) ) * a.dt #print "the delay is ", delay return delay
def is_periodic(aud_sample, SAMPLING_RATE = 8000): ''' :param aud_sample: Numpy 1D array rep of audio sample :param SAMPLING_RATE: Used to focus on human speech freq range :return: True if periodic, False if aperiodic ''' threshold = 20 # Use auto-correlation to find if there is enough periodicity in [50-400] Hz range values = signal.correlate(aud_sample, aud_sample, mode='full') # [50-400 Hz] corresponds to [2.5-20] ms OR [20-160] samples for 8 KHz sampling rate l_idx = int(SAMPLING_RATE*2.5/1000) r_idx = int(SAMPLING_RATE*20/1000) values = values[len(values)/2:] subset_values = values[l_idx:r_idx] if np.argmax(subset_values) < threshold: return False else: return True
def main(): """Initialize kernel, apply it to an image (via crosscorrelation, convolution).""" img = data.camera() kernel = np.array([ [-1, -2, -1], [0, 0, 0], [1, 2, 1] ]) cc_response = crosscorrelate(img, kernel) cc_gt = signal.correlate(img, kernel, mode="same") conv_response = convolve(img, kernel) conv_gt = signal.convolve(img, kernel, mode="same") util.plot_images_grayscale( [img, cc_response, cc_gt, conv_response, conv_gt], ["Image", "Cross-Correlation", "Cross-Correlation (Ground Truth)", "Convolution", "Convolution (Ground Truth)"] )
def grad_magnitude(img): """Calculate the gradient magnitude of an image. Args: img The image Returns: gradient image""" img = img / 255.0 sobel_y = np.array([ [-1, -2, -1], [0, 0, 0], [1, 2, 1] ]) sobel_x = np.rot90(sobel_y) # rotates counter-clockwise # apply x/y sobel filter to get x/y gradients imgx = signal.correlate(img, sobel_x, mode="same") imgy = signal.correlate(img, sobel_y, mode="same") imgmag = np.sqrt(imgx**2 + imgy**2) return imgmag
def main(): """Load image, apply filter, plot.""" img = data.camera() # just like sobel, but no -2/+2 in the middle prewitt_y = np.array([ [-1, -1, -1], [0, 0, 0], [1, 1, 1] ]) prewitt_x = np.rot90(prewitt_y) # rotates counter-clockwise img_sx = signal.correlate(img, prewitt_x, mode="same") img_sy = signal.correlate(img, prewitt_y, mode="same") g_magnitude = np.sqrt(img_sx**2 + img_sy**2) ground_truth = skifilters.prewitt(data.camera()) util.plot_images_grayscale( [img, img_sx, img_sy, g_magnitude, ground_truth], ["Image", "Prewitt (x)", "Prewitt (y)", "Prewitt (both/magnitude)", "Prewitt (Ground Truth)"] )
def test_pdos_1d(): pad=lambda x: pad_zeros(x, nadd=len(x)-1) n=500; w=welch(n) # 1 second signal t=np.linspace(0,1,n); dt=t[1]-t[0] # sum of sin()s with random freq and phase shift, 10 frequencies from # f=0...100 Hz v=np.array([np.sin(2*np.pi*f*t + rand()*2*np.pi) for f in rand(10)*100]).sum(0) f=np.fft.fftfreq(2*n-1, dt)[:n] c1=mirror(ifft(abs(fft(pad(v)))**2.0)[:n].real) c2=correlate(v,v,'full') c3=mirror(acorr(v,norm=False)) assert np.allclose(c1, c2) assert np.allclose(c1, c3) p1=(abs(fft(pad(v)))**2.0)[:n] p2=(abs(fft(mirror(acorr(v,norm=False)))))[:n] assert np.allclose(p1, p2) p1=(abs(fft(pad(v*w)))**2.0)[:n] p2=(abs(fft(mirror(acorr(v*w,norm=False)))))[:n] assert np.allclose(p1, p2)
def conv4d(x, weights, bias, output): # print 'called' assert len(x.shape) == 4 and len(output.shape) == 4 batch_size, input_channel = x.shape[:2] output_batch_size, output_channel = output.shape[:2] num_filters, filter_channel = weights.shape[:2] assert batch_size == output_batch_size, '%d vs %d' % (batch_size, output_batch_size) assert output_channel == num_filters assert filter_channel == input_channel # func = convolve if true_conv else correlate for img_idx in range(batch_size): for c in range(output_channel): output[img_idx][c] = (correlate(x[img_idx], weights[c], mode='valid') + bias[c].reshape((1, 1, 1))) # if img_idx == 0 and c == 0: # print output[img_idx][c] # print bias[c].reshape((1, 1, 1))
def calculate_time_offset_from_signals(times_A, signal_A, times_B, signal_B, plot=False, block=True): """ Calculates the time offset between signal A and signal B. """ convoluted_signals = signal.correlate(signal_B, signal_A) dt_A = np.mean(np.diff(times_A)) offset_indices = np.arange(-len(signal_A) + 1, len(signal_B)) max_index = np.argmax(convoluted_signals) offset_index = offset_indices[max_index] time_offset = dt_A * offset_index + times_B[0] - times_A[0] if plot: plot_results(times_A, times_B, signal_A, signal_B, convoluted_signals, time_offset, block=block) return time_offset
def local_mean(img, size=3): """ Compute a image of the local average """ structure_element = np.ones((size, size), dtype=img.dtype) l_mean = signal.correlate(img, structure_element, mode='same') l_mean /= size**2 return l_mean
def local_var(img, size=3): """ Compute a image of the local variance """ structure_element = np.ones((size, size), dtype=img.dtype) l_var = signal.correlate(img**2, structure_element, mode='same') l_var /= size**2 l_var -= local_mean(img, size=size)**2 return l_var
def read(self, station_name, plot=True): seismogram = np.loadtxt(station_name) self.t = seismogram[:,0] self.trace_x = seismogram[:,1] self.nt = len(self.t) self.dt = np.max(self.t)/len(self.t) #if (plot == true): plt.plot(self.t, self.trace_x) ################################################################ # cross correlate travel times ################################################################
def harris_ones(img, window_size, k=0.05): """Calculate the harris score based on a window function of diagonal ones. Args: img The image to use for corner detection. window_size Size of the window (NxN). k Weighting parameter during the final scoring (det vs. trace). Returns: Corner score image """ # Gradients img = skiutil.img_as_float(img) imgy, imgx = np.gradient(img) imgxy = imgx * imgy imgxx = imgx ** 2 imgyy = imgy ** 2 # window function (matrix of diagonal ones) window = np.ones((window_size, window_size)) # compute parts of harris matrix a11 = signal.correlate(imgxx, window, mode="same") / window_size a12 = signal.correlate(imgxy, window, mode="same") / window_size a21 = a12 a22 = signal.correlate(imgyy, window, mode="same") / window_size # compute score per pixel det_a = a11 * a22 - a12 * a21 trace_a = a11 + a22 return det_a - k * trace_a ** 2
def main(): """Load image, apply Canny, plot.""" img = skiutil.img_as_float(data.camera()) img = filters.gaussian_filter(img, sigma=1.0) sobel_y = np.array([ [-1, -2, -1], [0, 0, 0], [1, 2, 1] ]) sobel_x = np.rot90(sobel_y) # rotates counter-clockwise img_sx = signal.correlate(img, sobel_x, mode="same") img_sy = signal.correlate(img, sobel_y, mode="same") g_magnitudes = np.sqrt(img_sx**2 + img_sy**2) / np.sqrt(2) g_orientations = np.arctan2(img_sy, img_sx) g_mag_nonmax = non_maximum_suppression(g_magnitudes, g_orientations) canny = hysteresis(g_mag_nonmax, 0.35, 0.05) ground_truth = feature.canny(data.camera()) util.plot_images_grayscale( [img, g_magnitudes, g_mag_nonmax, canny, ground_truth], ["Image", "After Sobel", "After nonmax", "Canny", "Canny (Ground Truth)"] )
def apply_gauss(img, filter_mask): """Apply a gaussian filter to an image. Args: img The image filter_mask The filter mask (2D numpy array) Returns: Modified image """ return signal.correlate(img, filter_mask, mode="same") / np.sum(filter_mask) # from http://stackoverflow.com/questions/17190649/how-to-obtain-a-gaussian-filter-in-python
def erosion(img): """Perform Erosion on an image. Args: img The image to be changed. Returns: Changed image """ img = np.copy(img).astype(np.float32) mask = np.array([[1, 1, 1], [1, 1, 1], [1, 1, 1]]) corr = signal.correlate(img, mask, mode="same") / 9.0 corr[corr < 255.0 - 1e-8] = 0 corr = corr.astype(np.uint8) return corr
def main(): """Load image, apply sobel (to get x/y gradients), plot the results.""" img = data.camera() sobel_y = np.array([ [-1, -2, -1], [0, 0, 0], [1, 2, 1] ]) sobel_x = np.rot90(sobel_y) # rotates counter-clockwise # apply x/y sobel filter to get x/y gradients img_sx = signal.correlate(img, sobel_x, mode="same") img_sy = signal.correlate(img, sobel_y, mode="same") # combine x/y gradients to gradient magnitude # scikit-image's implementation divides by sqrt(2), not sure why img_s = np.sqrt(img_sx**2 + img_sy**2) / np.sqrt(2) # create binarized image threshold = np.average(img_s) img_s_bin = np.zeros(img_s.shape) img_s_bin[img_s > threshold] = 1 # generate ground truth (scikit-image method) ground_truth = skifilters.sobel(data.camera()) # plot util.plot_images_grayscale( [img, img_sx, img_sy, img_s, img_s_bin, ground_truth], ["Image", "Sobel (x)", "Sobel (y)", "Sobel (magnitude)", "Sobel (magnitude, binarized)", "Sobel (Ground Truth)"] )
def apply(self,shot): super(AveragingVarNormalizer,self).apply(shot) window_decay = self.conf['data']['window_decay'] window_size = self.conf['data']['window_size'] window = exponential(window_size,0,window_decay,False) window /= np.sum(window) shot.signals = apply_along_axis(lambda m : correlate(m,window,'valid'),axis=0,arr=shot.signals) shot.ttd = shot.ttd[-shot.signals.shape[0]:]
def translate_group(self, signalimg, group, translateaxis): n_channels = len(self.locs) all_xcorr = np.zeros((1, n_channels)) all_da = np.zeros((1, n_channels)) if translateaxis == 'x': proplane = 'xy' elif translateaxis == 'y': proplane = 'xy' elif translateaxis == 'z': proplane = 'xz' plotmode = 0 for j in range(n_channels): if plotmode: fig = plt.figure() ax1 = fig.add_subplot(1, 3, 1) plt.plot(signalimg[j]) ax2 = fig.add_subplot(1, 3, 2) if self.dataset_dialog.checks[j].isChecked(): index = self.group_index[j][group].nonzero()[1] x_rot = self.locs[j].x[index] y_rot = self.locs[j].y[index] z_rot = self.locs[j].z[index] xcorr_max = 0.0 plane = self.render_planes(x_rot, y_rot, z_rot, proplane, self.pixelsize) # if translateaxis == 'x': projection = np.sum(plane, axis=0) elif translateaxis == 'y': projection = np.sum(plane, axis=1) elif translateaxis == 'z': projection = np.sum(plane, axis=1) if plotmode: plt.plot(projection) #print('Step X') #ax3 = fig.add_subplot(1,3,3) #plt.imshow(plane, interpolation='nearest', cmap=plt.cm.ocean) corrval = np.max(signal.correlate(signalimg[j],projection)) shiftval = np.argmax(signal.correlate(signalimg[j], projection))-len(signalimg[j])+1 all_xcorr[0,j] = corrval all_da[0,j] = shiftval/self.oversampling if plotmode: plt.show() #value with biggest cc value form table maximumcc = np.argmax(np.sum(all_xcorr,axis = 1)) dafinal = np.mean(all_da[maximumcc,:]) for j in range(n_channels): index = self.group_index[j][group].nonzero()[1] if translateaxis == 'x': self.locs[j].x[index] += dafinal elif translateaxis == 'y': self.locs[j].y[index] += dafinal elif translateaxis == 'z': self.locs[j].z[index] += dafinal*self.pixelsize
def cross_correlation(self, reference, new_array, mode='full'): """Do cross correlation to two arrays Args: reference (array): Reference array. new_array (array): Array to be matched. mode (str): Correlation mode for `scipy.signal.correlate`. Returns: correlation_value (int): Shift value in pixels. """ # print(reference, new_array) cyaxis2 = new_array if float(re.sub('[A-Za-z" ]', '', self.lamp_header['SLIT'])) > 3: box_width = float( re.sub('[A-Za-z" ]', '', self.lamp_header['SLIT'])) / 0.15 self.log.debug('BOX WIDTH: {:f}'.format(box_width)) box_kernel = Box1DKernel(width=box_width) max_before = np.max(reference) cyaxis1 = convolve(reference, box_kernel) max_after = np.max(cyaxis1) cyaxis1 *= max_before / max_after else: gaussian_kernel = Gaussian1DKernel(stddev=2.) cyaxis1 = convolve(reference, gaussian_kernel) cyaxis2 = convolve(new_array, gaussian_kernel) # plt.plot(cyaxis1, color='k', label='Reference') # plt.plot(cyaxis2, color='r', label='New Array') # plt.plot(reference, color='g') # plt.show() try: ccorr = signal.correlate(cyaxis1, cyaxis2, mode=mode) except ValueError: print(cyaxis1, cyaxis2) # print('Corr ', ccorr) max_index = np.argmax(ccorr) x_ccorr = np.linspace(-int(len(ccorr) / 2.), int(len(ccorr) / 2.), len(ccorr)) correlation_value = x_ccorr[max_index] if False: plt.ion() plt.title('Cross Correlation') plt.xlabel('Lag Value') plt.ylabel('Correlation Value') plt.plot(x_ccorr, ccorr) plt.draw() plt.pause(2) plt.clf() plt.ioff() return correlation_value
def calc_timeshift(self, st_b, allow_negative_CC=False): """ dt_all, CC = calc_timeshift(self, st_b, allow_negative_CC) Calculate timeshift between two waveforms using the maximum of the cross-correlation function. Parameters ---------- self : obspy.Stream Stream that contains the reference traces st_b : obspy.Stream Stream that contains the traces to compare allow_negative_CC : boolean Pick the maximum of the absolute values of CC(t). Useful, if polarity may be wrong. Returns ------- dt_all : dict Dictionary with entries station.location and the estimated time shift in seconds. CC : dict Dictionary with the correlation coefficients for each station. """ dt_all = dict() CC_all = dict() for tr_a in self: try: tr_b = st_b.select(station=tr_a.stats.station, location=tr_a.stats.location)[0] corr = signal.correlate(tr_a.data, tr_b.data) if allow_negative_CC: idx_CCmax = np.argmax(abs(corr)) CC = abs(corr[idx_CCmax]) else: idx_CCmax = np.argmax(corr) CC = corr[idx_CCmax] dt = (idx_CCmax - tr_a.stats.npts + 1) * tr_a.stats.delta CC /= np.sqrt(np.sum(tr_a.data**2) * np.sum(tr_b.data**2)) # print('%s.%s: %4.1f sec, CC: %f' % # (tr_a.stats.station, tr_a.stats.location, dt, CC)) code = '%s.%s' % (tr_a.stats.station, tr_a.stats.location) dt_all[code] = dt CC_all[code] = CC except IndexError: print('Did not find %s' % (tr_a.stats.station)) return dt_all, CC_all