我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用numpy.correlate()。
def get_timepixel_g2( oned_count ): n = len( oned_count ) norm = ( np.arange( n, 0, -1) * np.array( [np.average(oned_count[i:]) for i in range( n )] ) * np.array( [np.average(oned_count[0:n-i]) for i in range( n )] ) ) return np.correlate(oned_count, oned_count, mode = 'full')[-n:]/norm #########################################
def gi_correlation(self, p_gi_len, p_symbol_len, signal): ''' Mode Erkennung ''' corr_delay = p_symbol_len - p_gi_len # Verzögertes Signal / Delay corr_A = signal[0:p_gi_len-1]; corr_B = signal[corr_delay:len(signal)-1] # Normierung corr_A = corr_A / np.sqrt( np.sum(np.square(np.abs(corr_A))) ) corr_B = corr_B / np.sqrt( np.sum(np.square(np.abs(corr_B))) ) # Korrelation erg_corr = np.correlate(corr_A,corr_B) return erg_corr[0]
def crosscorr(data, fb, fs, pairs=None): """ Parameters ---------- Returns ------- """ n_channels, n_samples = np.shape(data) filtered, _, _ = analytic_signal(data, fb, fs) r = np.zeros([n_channels, n_channels], dtype=np.float32) for i in range(n_channels): for ii in range(n_channels): r[i, ii] = np.correlate(filtered[i, ], filtered[ii, ], mode='valid') return r
def estimate_range(tx,rx,fs,quiet=False): """ tx: the known, noise-free, undelayed transmit signal (bistatic radars agree beforehand on the psuedorandom sequence) rx: the noisy, corrupted, interference, jammed signal to estimate distance from fs: baseband sample frequency """ Rxy = np.correlate(tx, rx, 'full') lags = np.arange(Rxy.size) - Rxy.size // 2 pklag = lags[Rxy.argmax()] distest_m = -pklag / fs / 2 * c mR = abs(Rxy) # magnitude of complex cross-correlation if not quiet and figure is not None: ax = figure().gca() ax.plot(lags,mR) ax.plot(pklag,mR[mR.argmax()], color='red', marker='*') ax.set_title('cross-correlation of receive waveform with transmit waveform') ax.set_ylabel('$|R_{xy}|$') ax.set_xlabel('lags') ax.set_xlim(pklag-100,pklag+100) return distest_m
def procchunk(rx, tx, P:dict): if P['rxfn'] is not None: rx = scipy.signal.resample_poly(rx, P['resample'].numerator, P['resample'].denominator) fs = P['txfs'] # %% resamples parameters NrxPRI = int(fs * P['pri']) # Number of RX samples per PRI (resampled) assert NrxPRI >= tx.size,'PRI must be longer than chirp length!' NrxChirp = rx.size // NrxPRI # number of complete PRIs received in this data assert NrxChirp == P['Nchirp'] Rxy = 0. for i in range(P['Nchirp']): r = rx[i*NrxPRI:(i+1)*NrxPRI] Rxy += np.correlate(tx, r,'same') if P['verbose']: plotxcor(Rxy, fs) draw() pause(0.1) return Rxy
def find_audio_period(aclip, t_min=.1, t_max=2, t_res=.01): """ Finds the period, in seconds of an audioclip. The beat is then given by bpm = 60/T t_min and _tmax are bounds for the returned value, t_res is the numerical precision """ chunksize = int(t_res*aclip.fps) chunk_duration = 1.0*chunksize/aclip.fps # v denotes the list of volumes v = np.array([(c**2).sum() for c in aclip.iter_chunks(chunksize)]) v = v-v.mean() corrs = np.correlate(v, v, mode = 'full')[-len(v):] corrs[:int(t_min/chunk_duration)]=0 corrs[int(t_max/chunk_duration):]=0 return chunk_duration*np.argmax(corrs)
def extractMseq(cover, stego, secret_length, m, tau=1): u"""Extract secret informations by spread spectrum using m-sequence. @param cover : cover data (2 dimensional np.ndarray) @param stego : stego data (2 dimension np.ndarray) @param secret_length : length of secret information @param m : M-Sequence @param tau : embed shift interval @return secret : extracted secret information """ cover = _image2vrctor(cover) stego = _image2vrctor(stego) m_length = len(m) data = stego - cover data = data[:m_length:tau] secret_data = correlate(m, data, cycle=CYCLE) center = ((m_length-1)*2+1)//2 secret_data = secret_data[center:center+secret_length] secret_data = list(map(_checkData, secret_data)) return secret_data
def _convolve_or_correlate(f, a, v, mode, propagate_mask): """ Helper function for ma.correlate and ma.convolve """ if propagate_mask: # results which are contributed to by either item in any pair being invalid mask = ( f(getmaskarray(a), np.ones(np.shape(v), dtype=np.bool), mode=mode) | f(np.ones(np.shape(a), dtype=np.bool), getmaskarray(v), mode=mode) ) data = f(getdata(a), getdata(v), mode=mode) else: # results which are not contributed to by any pair of valid elements mask = ~f(~getmaskarray(a), ~getmaskarray(v)) data = f(filled(a, 0), filled(v, 0), mode=mode) return masked_array(data, mask=mask)
def plot_correlate(): """Plots the autocorrelation function computed by numpy. """ wave = thinkdsp.read_wave('28042__bcjordan__voicedownbew.wav') wave.normalize() segment = wave.segment(start=0.2, duration=0.01) lags, corrs = autocorr(segment) corrs2 = numpy.correlate(segment.ys, segment.ys, mode='same') thinkplot.plot(corrs2) thinkplot.config(xlabel='lag', ylabel='correlation', xlim=[0, len(corrs2)]) thinkplot.save(root='autocorr9') N = len(corrs2) half = corrs2[N//2:] lengths = range(N, N//2, -1) half /= lengths half /= half[0]
def estimated_autocorrelation(x): n = len(x) variance = x.var() x = x - x.mean() r = np.correlate(x, x, mode='full')[-n:] assert np.allclose(r, np.array([(x[:n - k] * x[-(n - k):]).sum() for k in range(n)])) result = r / (variance * (np.arange(n, 0, -1))) return result
def _delta(x, window): return np.correlate(x, window, mode="same")
def test_float(self): self._setup(np.float) z = np.correlate(self.x, self.y, 'full') assert_array_almost_equal(z, self.z1) z = np.correlate(self.x, self.y[:-1], 'full') assert_array_almost_equal(z, self.z1_4) z = np.correlate(self.y, self.x, 'full') assert_array_almost_equal(z, self.z2) z = np.correlate(self.x[::-1], self.y, 'full') assert_array_almost_equal(z, self.z1r) z = np.correlate(self.y, self.x[::-1], 'full') assert_array_almost_equal(z, self.z2r) z = np.correlate(self.xs, self.y, 'full') assert_array_almost_equal(z, self.zs)
def test_object(self): self._setup(Decimal) z = np.correlate(self.x, self.y, 'full') assert_array_almost_equal(z, self.z1) z = np.correlate(self.y, self.x, 'full') assert_array_almost_equal(z, self.z2)
def test_no_overwrite(self): d = np.ones(100) k = np.ones(3) np.correlate(d, k) assert_array_equal(d, np.ones(100)) assert_array_equal(k, np.ones(3))
def test_complex(self): x = np.array([1, 2, 3, 4+1j], dtype=np.complex) y = np.array([-1, -2j, 3+1j], dtype=np.complex) r_z = np.array([3-1j, 6, 8+1j, 11+5j, -5+8j, -4-1j], dtype=np.complex) r_z = r_z[::-1].conjugate() z = np.correlate(y, x, mode='full') assert_array_almost_equal(z, r_z)
def align(l1, l2, axis): if axis == 1: #horizontal alignment, we do not care about the right line end #cw = min(l2.shape[1],l1.shape[1]) #l1 = l1[:,:cw] #l2 = l2[:,:cw] #compute correlation sc1 = np.sum(l1, axis=1-axis) sc2 = np.sum(l2, axis=1-axis) cor = np.correlate(sc1,sc2,"same") posErr = np.argmax(cor)-sc1.shape[0]/2 #place at right position if posErr > 0: l2c = l2.copy() l2c[:]=0 l2c[:,posErr:] = l2[:,:-posErr] l2 = l2c elif posErr < 0: l1c = l1.copy() l1c[:]=0 l1c[:,-posErr:] = l1[:,:posErr] l1=l1c else: #vertical alignment, we cate about both ends #compute correlation sc1 = np.sum(l1, axis=1-axis) sc2 = np.sum(l2, axis=1-axis) cor = np.correlate(sc1,sc2,"same") posErr = np.argmax(cor)-sc1.shape[0]/2 #place at right position if posErr > 0: l2c=l2.copy() l2c[:]=0 l2c[posErr:,:] = l2[:-posErr,:] l2 = l2c elif posErr < 0: l1c=l1.copy() l1c[:]=0 l1c[-posErr:,:]=l1[:posErr,:] l1 = l1c return posErr, l1, l2
def autocorr(x): x=x-np.mean(x) y=np.correlate(x,x,mode='full') return y[y.size/2:]/y[y.size/2]
def time_delay_func_paralel(start, end, outs, multi): for idx in range(start, end): print 'locating ...', getpid() c = numpy.correlate(multi[0, ][:, 0], multi[idx, ][:, 0], "full") C, I = c.max(0), c.argmax(0) outs[idx] = ((float(len(c))+1.0)/2.0 - I)/44100.0
def time_delay_func(x, y): print 'locating ...' c = numpy.correlate(x[:, 0], y[:, 0], "full") C, I = c.max(0), c.argmax(0) out = ((float(len(c))+1.0)/2.0 - I)/44100.0 return out
def fst_delay_snd(fst, snd, samp_rate, max_delay): # Verify argument shape. s1, s2 = fst.shape, snd.shape if len(s1) != 1 or len(s2) != 1 or s1[0] != s2[0]: raise Exception("Argument shape invalid, in 'fst_delay_snd' function") half_len = int(s1[0]/2) a = numpy.array(fst, dtype=numpy.double) b = numpy.array(snd, dtype=numpy.double) corr = numpy.correlate(a, b, 'same') max_pos = numpy.argmax(corr) # plot(s1[0], samp_rate, a, b, corr) return corr, (max_pos - half_len) / samp_rate
def fst_delay_snd(fst, snd, samp_rate): # Verify argument shape. s1, s2 = fst.shape, snd.shape if len(s1) != 1 or len(s2) != 1 or s1[0] != s2[0]: raise Exception("Argument shape invalid, in 'fst_delay_snd' function") half_len = int(s1[0]/2) corre = numpy.correlate(fst, snd, 'same') max_pos = numpy.argmax(corre) return (max_pos - half_len)/samp_rate
def rolling_mean(X, window_size): w = 1.0 / window_size * np.ones((window_size)) return np.correlate(X, w, 'valid')
def autoCorrelation(s): def ac1d(x): var = x.var() x = x - x.mean() r = np.correlate(x[-x.size:], x, mode='full') return r[r.size//2:] / (var * np.arange(x.shape[0], 0, -1)) return np.apply_along_axis(ac1d, 0, s)
def dodemod(rx, P:dict): aud = None fs = P['rxfs'] if P['demod']=='chirp': tx = loadbin(P['txfn'], P['txfs']) if tx is None: warnings.warn('simulated chirp reception') tx = rx rx = 0.05*rx + 0.1*rx.max()*(np.random.randn(rx.size) + 1j*np.random.randn(rx.size)) txfs = fs else: rx = scipy.signal.resample_poly(rx, UP, DOWN) fs = txfs = P['txfs'] txsec = tx.size/txfs # length of TX in seconds if P['pri'] is None: pri=txsec print(f'Using {pri*1000} ms PRI and {P["Npulse"]} pulses incoherently integrated') # %% integration NrxPRI = int(fs * pri) # Number of RX samples per PRI NrxStack = rx.size // NrxPRI # number of complete PRIs received in this data Nint = NrxStack // P['Npulse'] # Number of steps we'll take iterating Nextract = P['Npulse'] * NrxPRI # total number of samples to extract (in general part of one PRI is discarded after numerous PRIs) ax=None for i in range(Nint): ci = slice(i*Nextract, (i+1)*Nextract) rxint = rx[ci].reshape((NrxPRI, P['Npulse'])).mean(axis=1) Rxy = np.correlate(tx, rxint, 'full') ax = plotxcor(Rxy, txfs, ax) draw(); pause(0.5) elif P['demod']=='am': aud = am_demod(P['again']*rx, fs, fsaudio, P['fc'], p.audiobw, frumble=p.frumble, verbose=True) elif P['demod']=='ssb': aud = ssb_demod(P['again']*rx, fs, fsaudio, P['fc'], p.audiobw,verbose=True) return aud,fs
def plot(seq): """Plot the autocorrelation of the given sequence.""" import matplotlib.pyplot as plt bipolar = np.where(seq, 1.0, -1.0) autocorr = np.correlate(bipolar, bipolar, 'same') plt.figure() plt.title("Length {} Gold code autocorrelation".format(len(seq))) xdata = np.arange(len(seq)) - len(seq) // 2 plt.plot(xdata, autocorr, '.-') plt.show()
def _print_stats(seq): bipolar = np.where(seq, 1.0, -1.0) autocorr = np.correlate(bipolar, bipolar, 'same') peaks = np.sort(np.abs(autocorr)) peak = peaks[-1] noise = np.sqrt(np.mean(peaks[:-1]**2)) peak_to_peak2 = peak / peaks[-2] peak_to_noise = peak / noise print("Peak amplitude: {:.0f}".format(peak)) print("Largest non-peak amplitude: {:.0f}".format(peaks[-2])) print("Peak-to-max: {:.2f}".format(peak_to_peak2)) print("Peak-to-noise: {:.2f}".format(peak_to_noise))
def correlationIndividual(data, idx = (0,1), cls = -1, delay = (-100, 100)): """Calculate corrs and auto correlation in time between the various measures""" n = len(idx); means = np.mean(data[:,:-1], axis = 0); nd = delay[1] - delay[0] + 1; cc = np.zeros((nd,n,n)) for i in range(n): for j in range(n): if delay[0] < 0: cm = np.correlate(data[:, i] - means[i], data[-delay[0]:, j] - means[j]); else: cm = [0]; if delay[1] > 0: cp = np.correlate(data[:, j] - means[j], data[delay[1]:, i] - means[i]); else: cp = [0]; ca = np.concatenate((cm[1:], cp[::-1])); if delay[0] > 0: cc[:,i,j] = ca[delay[0]:]; elif delay[1] < 0: cc[:,i,j] = ca[:-delay[1]]; else: cc[:,i,j] = ca; return cc;
def correlationIndividualStages(data, idx = (0,1), cls = -1, delay = (-100, 100)): """Calculate correlation functions in time between the various measures in each stage""" stages = stageIndex(data, cls = cls); stages = np.insert(np.append(stages, data.shape[0]), 0, 0); ns = len(stages) - 1; n = len(idx); means = np.mean(data[:,:-1], axis = 0); nd = delay[1] - delay[0] + 1; cc = np.zeros((nd,n,n,ns)) for s in range(ns): dat = data[stages[s]:stages[s+1],:]; for i in range(n): for j in range(n): if delay[0] < 0: cm = np.correlate(dat[:, i] - means[i], dat[-delay[0]:, j] - means[j]); else: cm = [0]; if delay[1] > 0: cp = np.correlate(dat[:, j] - means[j], dat[delay[1]:, i] - means[i]); else: cp = [0]; ca = np.concatenate((cm[1:], cp[::-1])); if delay[0] > 0: cc[:,i,j,s] = ca[delay[0]:]; elif delay[1] < 0: cc[:,i,j,s] = ca[:-delay[1]]; else: cc[:,i,j,s] = ca; return cc;
def correlogram(experiment): left = experiment.getresults('left') right = experiment.getresults('right') corr_trials = [np.correlate(l,r,'same') for l,r in zip(left,right)] return np.mean(corr_trials,axis=0)
def five_group_stats(group): sales = np.array(group['Demanda_uni_equil'].values) samana = group['Semana'].values max_index = np.argmax(samana) returns = group['Dev_proxima'].mean() #this is signature on when slaes happens sorted_samana_index = np.argsort(samana) sorted_sales = sales[sorted_samana_index] signature = np.sum([ math.pow(2,s-3) for s in samana]) kurtosis = fillna_and_inf(scipy.stats.kurtosis(sorted_sales)) hmean = fillna_and_inf(scipy.stats.hmean(np.where(sales <0, 0.1, sales))) entropy = fillna_and_inf(scipy.stats.entropy(sales)) std = fillna_and_inf(np.std(sales)) N = len(sales) ci = fillna_and_inf(calculate_ci(std, N)) corr = fillna_and_inf(scipy.stats.pearsonr(range(N), sorted_sales)[0]) autocorr_list = np.correlate(sorted_sales, sorted_sales, mode='same') mean_autocorr = fillna_and_inf(np.mean(autocorr_list)) mean = np.mean(sales) mean_corss_points_count = 0 if N > 1: high_than_mean = mean < sorted_sales[0] for i in range(1,N): if (high_than_mean and mean > sorted_sales[i]) or (not high_than_mean and mean > sorted_sales[i]): mean_corss_points_count += mean_corss_points_count high_than_mean = mean < sorted_sales[i] return mean, N, std, np.median(sales), sales[max_index], samana[max_index], \ returns, signature, kurtosis, hmean, entropy, ci, corr, mean_autocorr, mean_corss_points_count
def angle_from_audio(self, file_name, chunks): [rate, wave] = wavfile.read(file_name) raw_0 = wave[:, 0].astype(np.float64) raw_1 = wave[:, 1].astype(np.float64) for i in range(1, chunks): start = i*chunks end = (i+1)*chunks left = raw_0[start:end] right = raw_1[start-self.buffer:end+self.buffer] corr_arr = np.correlate(right, left, 'valid') max_index = (len(corr_arr)/2)-np.argmax(corr_arr) time_d = max_index/float(rate) signal_dist = time_d*self.sound_speed if (signal_dist != 0 and abs(signal_dist)<=self.mic_dist): angle = math.degrees(math.asin( signal_dist / self.mic_dist)) self.angles.append(angle) a = np.array(self.angles) hist, bins = np.histogram(a, bins=10) # width = 0.7 * (bins[1] - bins[0]) # center = (bins[:-1] + bins[1:]) / 2 # plt.bar(center, hist, align='center', width=width) # plt.xlabel('Angle (degrees)', fontsize=16) # plt.show() index = np.argmax(hist) self.angle_pred = bins[index] print self.angle_pred
def autocorrelation(x,lags): #Temporal correlation n = len(x) x = np.array(x) result = [np.correlate(x[i:] - x[i:].mean(),x[:n-i]-x[:n-i].mean())[0]\ /(x[i:].std()*x[:n-i].std()*(n-i)) for i in range(1,lags+1)] return result