我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用numpy.convolve()。
def multinomial_entropy(probs, count): """Compute entropy of multinomial distribution with given probs and count. Args: probs: A 1-dimensional array of normalized probabilities. count: The number of draws in a multinomial distribution. Returns: A number in [0, count * len(probs)] representing entropy. """ assert count > 0 multi_probs = probs for _ in range(count - 1): if len(probs) > 2: raise NotImplementedError( 'Only categorical and binomial are supported') multi_probs = np.convolve(multi_probs, probs) return entropy(multi_probs)
def cal_running_std(event_freq, n=16): """Calculate running standard deviation. Parameters ---------- event_freq : numpy.ndarray Event frequency count in given window n : int Running window for computing STD Returns ------- o : numpy.ndarray Running standard deviation of given event frequency array """ q = event_freq[:, 1]**2 q = np.convolve(q, np.ones((n, )), mode="valid") s = np.convolve(event_freq[:, 1], np.ones((n, )), mode="valid") o = (q-s**2/n)/float(n-1) return o
def smoothCurve(X, Fac): NPoints = X.shape[0] dim = X.shape[1] idx = range(NPoints) idxx = np.linspace(0, NPoints, NPoints*Fac) Y = np.zeros((NPoints*Fac, dim)) NPointsOut = 0 for ii in range(dim): Y[:, ii] = interp.spline(idx, X[:, ii], idxx) #Smooth with box filter y = (0.5/Fac)*np.convolve(Y[:, ii], np.ones(Fac*2), mode='same') Y[0:len(y), ii] = y NPointsOut = len(y) Y = Y[0:NPointsOut-1, :] Y = Y[2*Fac:-2*Fac, :] return Y
def _plot_walk(self, ax, parameter, data, truth=None, extents=None, convolve=None, color=None): # pragma: no cover if extents is not None: ax.set_ylim(extents) assert convolve is None or isinstance(convolve, int), \ "Convolve must be an integer pixel window width" x = np.arange(data.size) ax.set_xlim(0, x[-1]) ax.set_ylabel(parameter) if color is None: color = "#0345A1" ax.scatter(x, data, c=color, s=2, marker=".", edgecolors="none", alpha=0.5) max_ticks = self.parent.config["max_ticks"] ax.yaxis.set_major_locator(MaxNLocator(max_ticks, prune="lower")) if convolve is not None: color2 = self.parent.color_finder.scale_colour(color, 0.5) filt = np.ones(convolve) / convolve filtered = np.convolve(data, filt, mode="same") ax.plot(x[:-1], filtered[:-1], ls=':', color=color2, alpha=1) if truth is not None: ax.axhline(truth, **self.parent.config_truth)
def moving_average(data, window_size=5): """ Moving average. Parameters ---------- data : array_like An array of values. window_size : int, optional Moving average window size. Returns ------- result : array_like Moving averaged array. """ window = np.ones(int(window_size))/float(window_size) results = np.convolve(data, window, 'valid') return results
def boxcar(y, window_size=3): """ Smooth the input vector using the mean of the neighboring values, where neighborhood size is defined by the window. Parameters ========== y : array The vector to be smoothed. window_size : int An odd integer describing the window size. Returns ======= : array The smoothed array. """ filt = np.ones(window_size) / window_size return Series(np.convolve(y, filt, mode='same'), index=y.index)
def gaussian(y, window_size=3, sigma=2): """ Apply a gaussian filter to smooth the input vector Parameters ========== y : array The input array window_size : int An odd integer describing the size of the filter. sigma : float The numver of standard deviation """ filt = signal.gaussian(window_size, sigma) return Series(signal.convolve(y, filt, mode='same'), index=y.index)
def convolve(infile, ir_name, level = 0.5): """ Apply convolution to infile using impulse response given Args: infile (str): Filename ir_name can be 'smartphone_mic' or 'classroom' level (float) : can be between 0 and 1, default value = 0.5 """ fs1, x = monoWavRead(filename=infile) x = np.copy(x) #Change the path below for the sounds folder ir_path = './sounds/ir_{0}.wav'.format(ir_name) fs2, ir = monoWavRead(filename=ir_path) y = np.convolve(x, ir, 'full')[0:x.shape[0]] * level + x * (1 - level) #Change the output file name to suit your requirements here outfile_name = os.path.basename(infile).split(".")[0] + ("{0}_convolved{1}.wav".format(ir_name, level)) outfile = os.path.join(outfile_path, outfile_name) write(filename = outfile, rate = fs1, data = y) if (FILE_DELETION): extractFeaturesAndDelete(outfile)
def convolve_lsf(flux,lsf): if len(flux) < len(np.atleast_1d(lsf)): # Add padding to make sure to return the same length in flux. padding = np.ones(len(lsf)-len(flux)+1) flux = np.hstack([padding,flux]) conv_flux = 1-np.convolve(1-flux,lsf,mode='same') /np.sum(lsf) return conv_flux[len(padding):] else: # convolve 1-flux to remove edge effects wihtout using padding return 1-np.convolve(1-flux,lsf,mode='same') /np.sum(lsf) ############################################################################### # Convergence ###############################################################################
def moving_average(x, n, type='simple'): """ compute an n period moving average. type is 'simple' | 'exponential' """ x = np.asarray(x) if type == 'simple': weights = np.ones(n) else: weights = np.exp(np.linspace(-1., 0., n)) weights /= weights.sum() a = np.convolve(x, weights, mode='full')[:len(x)] a[:n] = a[n] return a
def ma(x, win): """Compute the moving average of x with a window equal to win Args: x (numpy.array): data win (int): window Returns: numpy.array: the smoothed data """ y = np.ones(win, dtype=np.float64) i = win - 1 _x = np.convolve(x, y, mode='full')[:-i] _x[1:i] = _x[1:i] / np.arange(2., win, dtype=np.float64) _x[i:] = _x[i:] / float(win) return _x
def frequencyResponse(self, freqs=None): if self.bandType == 'allpass': return spsig.freqz(1, worN=freqs) if self.bandType == 'allstop': return spsig.freqz(0, worN=freqs) numCoef = self.numCoef denomCoef = self.denomCoef if self.zeroPhase: # http://www.mathworks.com/matlabcentral/newsreader/view_thread/245017 numCoef = np.convolve(numCoef,numCoef[::-1]) denomCoef = np.convolve(denomCoef,denomCoef[::-1]) # freqz does not preserve dtype of arguments, report bug XXX - idfah return spsig.freqz(numCoef, denomCoef, worN=freqs)
def probs_to_classes(self, probabilities): """Takes a likelihood matrix produced by predict_proba and returns the classification for each entry Naive argmax returns a very noisy signal - windowing helps focus on strongly matching areas. """ smooth_time = self.params.get('smooth_time', 0.1) dt = self.active_song.time[1] - self.active_song.time[0] windowsize = np.round(smooth_time / dt) window = signal.get_window('hamming', int(windowsize)) window /= np.sum(window) num_classes = probabilities.shape[0] smooth_prbs = [np.convolve( probabilities[i, :], window, mode='same') for i in range(num_classes)] return np.argmax(np.stack(smooth_prbs, axis=0), axis=0)
def savitzky_golay(y, window_size, order, deriv=0, rate=1): import numpy as np from math import factorial try: window_size = np.abs(np.int(window_size)) order = np.abs(np.int(order)) except ValueError, msg: raise ValueError("window_size and order have to be of type int") if window_size % 2 != 1 or window_size < 1: raise TypeError("window_size size must be a positive odd number") if window_size < order + 2: raise TypeError("window_size is too small for the polynomials order") order_range = range(order+1) half_window = (window_size -1) // 2 b = np.mat([[k**i for i in order_range] for k in range(-half_window, half_window+1)]) m = np.linalg.pinv(b).A[deriv] * rate**deriv * factorial(deriv) firstvals = y[0] - np.abs( y[1:half_window+1][::-1] - y[0] ) lastvals = y[-1] + np.abs(y[-half_window-1:-1][::-1] - y[-1]) y = np.concatenate((firstvals, y, lastvals)) return np.convolve( m[::-1], y, mode='valid')
def synth_audio(audiofile, impfile, chns, angle, nsfile=None, snrlevel=None, outname=None, outsplit=False): FreqSamp, audio = wavfile.read(audiofile) audio = audio.astype(np.float32)/np.amax(np.absolute(audio.astype(np.float32))) gen_audio = np.zeros((audio.shape[0], chns), dtype=np.float32) for ch in range(1,chns+1): impulse = np.fromfile('{}D{:03d}_ch{}.flt'.format(impfile, angle, ch), dtype=np.float32) gen_audio[:,ch-1] = np.convolve(audio, impulse, mode='same') gen_audio = add_noise(gen_audio, nsfile=nsfile, snrlevel=snrlevel) if outname is None: return FreqSamp, np.transpose(gen_audio) if outsplit: for ch in range(chns): play_data = audiolab.wavwrite(gen_audio[:,ch],'{}_ch{:02d}.wav'.format(outname,ch), fs=FreqSamp, enc='pcm16') return else: play_data = audiolab.wavwrite(gen_audio,'{}.wav'.format(outname), fs=FreqSamp, enc='pcm16') return
def savitzky_golay(y, window_size, order, deriv=0, rate=1): try: window_size = np.abs(np.int(window_size)) order = np.abs(np.int(order)) except ValueError, msg: raise ValueError("window_size and order have to be of type int") if window_size % 2 != 1 or window_size < 1: raise TypeError("window_size size must be a positive odd number") if window_size < order + 2: raise TypeError("window_size is too small for the polynomials order") order_range = range(order+1) half_window = (window_size -1) // 2 # precompute coefficients b = np.mat([[k**i for i in order_range] for k in range(-half_window, half_window+1)]) m = np.linalg.pinv(b).A[deriv] * rate**deriv * factorial(deriv) # pad the signal at the extremes with # values taken from the signal itself firstvals = y[0] - np.abs( y[1:half_window+1][::-1] - y[0] ) lastvals = y[-1] + np.abs(y[-half_window-1:-1][::-1] - y[-1]) y = np.concatenate((firstvals, y, lastvals)) return np.convolve( m[::-1], y, mode='valid')
def running_average(obs, ws): ''' calculates a running average obs -- observations ws -- window size (number of points to average) ''' ws=int(ws) try: tmp_vals = np.convolve(np.ones(ws, dtype=float)/ws, obs, mode='same') # fix the edges. using mode='same' assumes zeros outside the range if ws%2==0: tmp_vals[:ws//2]*=float(ws)/np.arange(ws//2,ws) if ws//2>1: tmp_vals[-ws//2+1:]*=float(ws)/np.arange(ws-1,ws//2,-1.0) else: tmp_vals[:ws//2]*=float(ws)/np.arange(ws//2+1,ws) tmp_vals[-ws//2:]*=float(ws)/np.arange(ws,ws//2,-1.0) except: import ipdb; ipdb.set_trace() tmp_vals = 0.5*np.ones_like(obs, dtype=float) return tmp_vals
def smooth(x, window_len=10, window='hanning'): x=np.array(x) if x.ndim != 1: raise ValueError, "smooth only accepts 1 dimension arrays." if x.size < window_len: raise ValueError, "Input vector needs to be bigger than window size." if window_len < 3: return x if not window in ['flat', 'hanning', 'hamming', 'bartlett', 'blackman']: raise ValueError, "Window is on of 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'" s = np.r_[2 * x[0] - x[window_len - 1::-1], x, 2 * x[-1] - x[-1:-window_len:-1]] if window == 'flat': # moving average w = np.ones(window_len, 'd') else: w = eval('np.' + window + '(window_len)') y = np.convolve(w / w.sum(), s, mode='same') return y[window_len:-window_len + 1]
def measureLoop(self): """ Measure 10 values, add them to buffer and remove the 10 oldest values. """ if self.stopRequest: self.stopRequest = False self.unlock() return data = np.zeros((100, self._data_logic.getChannels())) data[:, 0] = np.array([self._data_logic.getData() for i in range(100)]) self.buf = np.roll(self.buf, -100, axis=0) self.buf[-101:-1] = data w = np.hanning(self.window_len) s = np.r_[self.buf[self.window_len-1:0:-1], self.buf, self.buf[-1:-self.window_len:-1]] for channel in range(self._data_logic.getChannels()): convolved = np.convolve(w/w.sum(), s[:, channel], mode='valid') self.smooth[:, channel] = convolved self.sigRepeat.emit()
def sobel(x, window_len=7): """Sobel differential filter for calculating KDP. This solution has been taken from StackOverflow :cite:`Sobel-linfit` Returns ------- output : differential signal (unscaled for gate spacing) """ s = np.r_[x[window_len - 1:0:-1], x, x[-1:-window_len:-1]] w = 2.0 * np.arange(window_len) / (window_len - 1.0) - 1.0 w = w / (abs(w).sum()) y = np.convolve(w, s, mode='valid') return (-1.0 * y[int(window_len / 2):len(x) + int(window_len / 2)] / (window_len / 3.0))
def get_moving_average(x, n, type_str): #compute an n period moving average. #type is 'simple' | 'exponential' my_list=[] x = np.asarray(x) if type_str == 'simple': weights = np.ones(n) elif type_str == 'exponential': weights = np.exp(np.linspace(-1., 0., n)) elif type_str == 'weight': weights = np.flipud(np.arange(1,n+1, dtype=float)) weights /= weights.sum() a = np.convolve(x, weights, mode='full')[:len(x)] a[:n] = a[n] for i in xrange (0, len(a), 1): my_list.append(np.array_str(a[i])) return my_list
def testCausalConv(self): """Tests that the op is equivalent to a numpy implementation.""" x1 = np.arange(1, 21, dtype=np.float32) x = np.append(x1, x1) x = np.reshape(x, [2, 20, 1]) f = np.reshape(np.array([1, 1], dtype=np.float32), [2, 1, 1]) out = causal_conv(x, f, 4) with self.test_session() as sess: result = sess.run(out) # Causal convolution using numpy ref = np.convolve(x1, [1, 0, 0, 0, 1], mode='valid') ref = np.append(ref, ref) ref = np.reshape(ref, [2, 16, 1]) self.assertAllEqual(result, ref)
def _convolve_or_correlate(f, a, v, mode, propagate_mask): """ Helper function for ma.correlate and ma.convolve """ if propagate_mask: # results which are contributed to by either item in any pair being invalid mask = ( f(getmaskarray(a), np.ones(np.shape(v), dtype=np.bool), mode=mode) | f(np.ones(np.shape(a), dtype=np.bool), getmaskarray(v), mode=mode) ) data = f(getdata(a), getdata(v), mode=mode) else: # results which are not contributed to by any pair of valid elements mask = ~f(~getmaskarray(a), ~getmaskarray(v)) data = f(filled(a, 0), filled(v, 0), mode=mode) return masked_array(data, mask=mask)
def _zseries_mul(z1, z2): """Multiply two z-series. Multiply two z-series to produce a z-series. Parameters ---------- z1, z2 : 1-D ndarray The arrays must be 1-D but this is not checked. Returns ------- product : 1-D ndarray The product z-series. Notes ----- This is simply convolution. If symmetric/anti-symmetric z-series are denoted by S/A then the following rules apply: S*S, A*A -> S S*A, A*S -> A """ return np.convolve(z1, z2)
def plot_bitcoin(): nrows = 1625 df = pandas.read_csv('coindesk-bpi-USD-close.csv', nrows=nrows, parse_dates=[0]) ys = df.Close.values window = numpy.ones(30) window /= sum(window) smoothed = numpy.convolve(ys, window, mode='valid') N = len(window) smoothed = thinkdsp.shift_right(smoothed, N//2) thinkplot.plot(ys, color='0.7', label='daily') thinkplot.plot(smoothed, label='30 day average') thinkplot.config(xlabel='time (days)', ylabel='price', xlim=[0, nrows], # ylim=[-60, 60], loc='lower right') thinkplot.save(root='convolution1')
def MA(x, n, type='simple'): """ compute an n period moving average. type is 'simple' | 'exponential' """ x = np.asarray(x) if type=='simple': weights = np.ones(n) else: weights = np.exp(np.linspace(-1., 0., n)) weights /= weights.sum() a = np.convolve(x, weights, mode='full')[:len(x)] a[:n] = a[n] return a
def filter(self, X, Y): if self.interpolate: X, Y = self.simplefill(X, Y) else: X, Y = self.sortxy(X, Y) order_range = list(range(self.order+1)) half_window = (self.window_size - 1) // 2 # precompute coefficients b = np.mat([[k**i for i in order_range] for k in range(-half_window, half_window+1)]) m = np.linalg.pinv(b).A[self.deriv] # pad the signal at the extremes with # values taken from the signal itself firstvals = Y[0] - np.abs(Y[1:half_window+1][::-1] - Y[0]) lastvals = Y[-1] + np.abs(Y[-half_window-1:-1][::-1] - Y[-1]) Y1 = np.concatenate((firstvals, Y, lastvals)) Y2 = np.convolve(m, Y1, mode='valid') return X, Y2
def test_fft_convolution(self): x = np.array([1, 2, 3]) h = np.array([0, 1, 0.5]) expected_result = np.array([1., 2.5, 4.]) result_np = np.convolve(x, h, 'same') self.assertTrue(np.array_equal(result_np, expected_result)) result_fft = Filter.fft_convolve_1d(x, h) self.assertTrue(np.array_equal(result_fft, expected_result)) x = np.linspace(0, 1, num=10 ** 3).astype(np.complex64) h = Filter.design_windowed_sinc_bandpass(0.1, 0.4, 0.01) # fft convolve is faster if IR is round about 400 samples or windowed sinc has bandwidth of 0.01 #print(len(h)) t_np = time.time() result_np = np.convolve(x, h, mode="same") t_np = time.time() - t_np t_fft = time.time() result_fft = Filter.fft_convolve_1d(x, h) t_fft = time.time() - t_fft np.testing.assert_array_almost_equal(result_np, result_fft) #print("fft convolve time", t_fft, "np convolve time", t_np)
def apply_bandpass_filter(data, f_low, f_high, filter_bw=0.08): if f_low > f_high: f_low, f_high = f_high, f_low f_low = util.clip(f_low, -0.5, 0.5) f_high = util.clip(f_high, -0.5, 0.5) h = Filter.design_windowed_sinc_bandpass(f_low, f_high, filter_bw) # Choose normal or FFT convolution based on heuristic described in # https://softwareengineering.stackexchange.com/questions/171757/computational-complexity-of-correlation-in-time-vs-multiplication-in-frequency-s/ if len(h) < 8 * math.log(math.sqrt(len(data))): logger.debug("Use normal convolve") return np.convolve(data, h, 'same') else: logger.debug("Use FFT convolve") return Filter.fft_convolve_1d(data, h)
def convolve(self, other): """Convolves two Spectrums. other: Spectrum returns: Spectrum """ assert all(self.fs == other.fs) if self.full: hs1 = np.fft.fftshift(self.hs) hs2 = np.fft.fftshift(other.hs) hs = np.convolve(hs1, hs2, mode='same') hs = np.fft.ifftshift(hs) else: # not sure this branch would mean very much hs = np.convolve(self.hs, other.hs, mode='same') return Spectrum(hs, self.fs, self.framerate, self.full)
def convolve(self, other): """Convolves two waves. Note: this operation ignores the timestamps; the result has the timestamps of self. other: Wave or NumPy array returns: Wave """ if isinstance(other, Wave): assert self.framerate == other.framerate window = other.ys else: window = other ys = np.convolve(self.ys, window, mode='full') #ts = np.arange(len(ys)) / self.framerate return Wave(ys, framerate=self.framerate)
def movingaverage (values, window): weights = np.repeat(1.0, window)/window sma = np.convolve(values, weights, 'valid') return sma
def moving_average(X, win_size=3): return np.convolve(X, np.repeat(1.0, win_size) / win_size, 'valid')
def smooth(votes, **params): """Compute the convolution with a Gaussian signal.""" window = params.get('window', 50) std = params.get('std', 20) profiles = dict() window = gaussian(window, std=std) for k, vote in votes.iteritems(): smoothed = convolve(vote, window, mode='same') profiles[k] = smoothed return profiles
def movingaverage(interval, window_size): window = np.ones(int(window_size)) / float(window_size) return np.convolve(interval, window, 'same') # ----------------------------------------------------------------------------- # AAG Cloud Sensor Class # -----------------------------------------------------------------------------
def __init__(self, n, axis, order=3, mode='valid'): """ Construct a second-order gradient operator for signal of dimension *n* for dimension *axis*. Use a filter kernel of length *order* (must be odd). Use convolution type *mode*. """ # assert that the filter length is odd assert(order % 2 == 1) self.n = n self.ndim = len(self.n) self.axis = axis if axis < 0 or axis >= self.ndim: raise ValueError('0 <= axis (= {0}) < ndim = {1}'.format(axis, self.ndim)) self.d = differentiator(int(order/2) + 1) self.d2 = NP.convolve(self.d, self.d) self.mode = mode h_list = [] m = [] for i in reversed(range(self.ndim)): if i == axis: h_list.append(self.d2) else: h_list.append(NP.array([1])) m.append(len(h_list[-1])) self.m = m if mode == 'circ': n_prime = array(n) - m + 1 super(Gradient2Filter, self).__init__(n_prime, h_list, mode=mode) else: super(Gradient2Filter, self).__init__(n, h_list, mode=mode)
def plot_ema(self, n, p, linestyle): temp = [] for i in range(n): temp.append(i**p) mask = list(reversed([float(x)/sum(temp) for x in temp])) self.ema = np.convolve(self.signal[:,1], mask, mode='valid') print len(self.ema) print len(self.tstamps) self.plt.plot(self.tstamps[n-1:], self.ema, linestyle, linewidth=1.7)
def test_convolve_empty(self, level=rlevel): # Convolve should raise an error for empty input array. self.assertRaises(ValueError, np.convolve, [], [1]) self.assertRaises(ValueError, np.convolve, [1], [])
def test_object(self): d = [1.] * 100 k = [1.] * 3 assert_array_almost_equal(np.convolve(d, k)[2:-2], np.full(98, 3))
def test_no_overwrite(self): d = np.ones(100) k = np.ones(3) np.convolve(d, k) assert_array_equal(d, np.ones(100)) assert_array_equal(k, np.ones(3))
def polymul(c1, c2): """ Multiply one polynomial by another. Returns the product of two polynomials `c1` * `c2`. The arguments are sequences of coefficients, from lowest order term to highest, e.g., [1,2,3] represents the polynomial ``1 + 2*x + 3*x**2.`` Parameters ---------- c1, c2 : array_like 1-D arrays of coefficients representing a polynomial, relative to the "standard" basis, and ordered from lowest order term to highest. Returns ------- out : ndarray Of the coefficients of their product. See Also -------- polyadd, polysub, polydiv, polypow Examples -------- >>> from numpy.polynomial import polynomial as P >>> c1 = (1,2,3) >>> c2 = (3,2,1) >>> P.polymul(c1,c2) array([ 3., 8., 14., 8., 3.]) """ # c1, c2 are trimmed copies [c1, c2] = pu.as_series([c1, c2]) ret = np.convolve(c1, c2) return pu.trimseq(ret)
def runningMean(self, x, N): return np.convolve(x, np.ones((N,))/N)[(N-1):]
def smooth_reward_curve(x, y): # Halfwidth of our smoothing convolution halfwidth = min(31, int(np.ceil(len(x) / 30))) k = halfwidth xsmoo = x[k:-k] ysmoo = np.convolve(y, np.ones(2 * k + 1), mode='valid') / \ np.convolve(np.ones_like(y), np.ones(2 * k + 1), mode='valid') downsample = max(int(np.floor(len(xsmoo) / 1e3)), 1) return xsmoo[::downsample], ysmoo[::downsample]
def smooth_curve(x): """????????????????????? ???http://glowingpython.blogspot.jp/2012/02/convolution-with-numpy.html """ window_len = 11 s = np.r_[x[window_len - 1:0:-1], x, x[-1:-window_len:-1]] w = np.kaiser(window_len, 2) y = np.convolve(w / w.sum(), s, mode='valid') return y[5:len(y) - 5]
def rolling(N, i, loss, err): i_ = i[N-1:] K = np.full(N, 1./N) loss_ = np.convolve(loss, K, 'valid') err_ = np.convolve(err, K, 'valid') return i_, loss_, err_