我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用numpy.average()。
def getmarkercenter(image, pos): mkradius = getapproxmarkerradius(image) buffer = int(mkradius * 0.15) roisize = mkradius + buffer # half of the height or width x = pos[0] - roisize y = pos[1] - roisize w = 2 * roisize h = 2 * roisize roi = image[y:y+h, x:x+w] grayroi = getgrayimage(roi) ret, binimage = cv2.threshold(grayroi,0,255,cv2.THRESH_BINARY_INV+cv2.THRESH_OTSU) nlabels, labels, stats, centroids = cv2.connectedComponentsWithStats(binimage) # stats[0], centroids[0] are for the background label. ignore lblareas = stats[1:,cv2.CC_STAT_AREA] ave = np.average(centroids[1:], axis=0, weights=lblareas) return tuple(np.array([x, y]) + ave) # weighted average pos of centroids
def calculate_gap(predictions, actuals, top_k=20): """Performs a local (numpy) calculation of the global average precision. Only the top_k predictions are taken for each of the videos. Args: predictions: Matrix containing the outputs of the model. Dimensions are 'batch' x 'num_classes'. actuals: Matrix containing the ground truth labels. Dimensions are 'batch' x 'num_classes'. top_k: How many predictions to use per video. Returns: float: The global average precision. """ gap_calculator = ap_calculator.AveragePrecisionCalculator() sparse_predictions, sparse_labels, num_positives = top_k_by_class(predictions, actuals, top_k) gap_calculator.accumulate(flatten(sparse_predictions), flatten(sparse_labels), sum(num_positives)) return gap_calculator.peek_ap_at_n()
def minScalErr(stec,el,z,thisBias): """ this determines the slope of the vTEC vs. Elevation line, which should be minimized in the minimum scalloping technique for receiver bias removal inputs: stec - time indexed Series of slant TEC values el - corresponding elevation values, also Series z - mapping function values to convert to vTEC from entire file, may contain nans, Series thisBias - the bias to be tested and minimized """ intel=np.asarray(el[stec.index],int) # bin the elevation values into int sTEC=np.asarray(stec,float) zmap = z[stec.index] c=np.array([(i,np.average((sTEC[intel==i]-thisBias) /zmap[intel==i])) for i in np.unique(intel) if i>30]) return np.polyfit(c[:,0],c[:,1],1)[0]
def score_fusion_strategy(strategy_name = 'average'): """Returns a function to compute a fusion strategy between different scores. Different strategies are employed: * ``'average'`` : The averaged score is computed using the :py:func:`numpy.average` function. * ``'min'`` : The minimum score is computed using the :py:func:`min` function. * ``'max'`` : The maximum score is computed using the :py:func:`max` function. * ``'median'`` : The median score is computed using the :py:func:`numpy.median` function. * ``None`` is also accepted, in which case ``None`` is returned. """ try: return { 'average' : numpy.average, 'min' : min, 'max' : max, 'median' : numpy.median, None : None }[strategy_name] except KeyError: # warn("score fusion strategy '%s' is unknown" % strategy_name) return None
def __init__(self, parent, mlca, width=6, height=6, dpi=100): figure = Figure(figsize=(width, height), dpi=dpi, tight_layout=True) axes = figure.add_subplot(111) super(LCAResultsPlot, self).__init__(figure) self.setParent(parent) activity_names = [format_activity_label(next(iter(f.keys()))) for f in mlca.func_units] # From https://stanford.edu/~mwaskom/software/seaborn/tutorial/color_palettes.html cmap = sns.cubehelix_palette(8, start=.5, rot=-.75, as_cmap=True) hm = sns.heatmap( # mlca.results / np.average(mlca.results, axis=0), # Normalize to get relative results mlca.results, annot=True, linewidths=.05, cmap=cmap, xticklabels=["\n".join(x) for x in mlca.methods], yticklabels=activity_names, ax=axes, square=False, ) hm.tick_params(labelsize=8) self.setMinimumSize(self.size()) # sns.set_context("notebook")
def model_sentiment(self, b, s, fo=0.99): """ Defines the real-time sentiment model given a dataframe of tweets. :param b: A ``TweetBin`` to calculate the new sentiment value. :param s: The initial Sentiment to begin calculation. :param fo: Fall-off factor """ df = b.df.loc[b.df.sentiment != 0] # drop rows having 0 sentiment newval = s.value if(len(df)>0): try: val = np.average( df.sentiment, weights=df.followers_count+df.friends_count ) except ZeroDivisionError: val = 0 newval = s.value*fo + val*(1-fo) return Sentiment(newval, b.influence, b.start, b.end)
def load_cv_folds(model_name): models = [] for i in range(5): net = np.load(paths.predictions + model_name + '-split_{}.npz'.format(i)) net = { 'train': np.average(net['train'], axis=0), 'val': np.average(net['val'], axis=0), 'test': np.average(net['test'], axis=0) } models.append(net) labels_df = labels.get_labels_df() kf = sklearn.model_selection.KFold(n_splits=5, shuffle=True, random_state=1) split = kf.split(labels_df) folds = [] for i, ((train_idx, val_idx), net) in enumerate(zip(split, models)): val = labels_df.ix[val_idx] train = labels_df.ix[train_idx] folds.append((net, val, train)) print(i) return folds # load_cv_folds takes the model name
def print_timing_analysis(self): border = 20 if len(self.timing_analysis) < 2 * border: return False analyis_lenght = len(self.timing_analysis) - border print('Total of {} pulse pairs has been sent' .format(analyis_lenght)) stamps = self.timing_analysis[border:analyis_lenght] deltas = [] for i in range(len(stamps) - 1, 2, -1): deltas.append(stamps[i] - stamps[i - 1]) # print('Filter out pen ups') deltas = [d for d in deltas if d < 0.1] print ('For {} pulses:'.format(analyis_lenght)) freq = 1 / np.average(deltas) print ('Pulse frequency was: {0:.2f} Hz' .format(freq)) print ('StdDev of sleep times: {0:.6f} seconds' .format(np.std(deltas))) self.timing_analysis = []
def analyze_distance_to_target(self): if len(self.y_distances) > 0: average_y = np.average(self.y_distances) else: return self.y_distances = [] self.lag_log.append(average_y) if len(self.lag_log) > self.lag_cycles * 2: test_region = self.lag_log[-self.lag_cycles:] sum_lag = sum(test_region) if sum_lag < 0: if (abs(sum_lag) > self.lag_cycles * self.boost_threshold): return 1 if sum_lag > 0: if (abs(sum_lag) > self.lag_cycles * self.boost_threshold): return -1 return 0
def print_timing_analysis(self): border = 10 if len(self.anoto_timing) < 2 * border: return False analyze_window = len(self.anoto_timing) - border stamps = self.anoto_timing[border:analyze_window] deltas = [] for i in range(len(stamps) - 1, 2, -1): deltas.append(stamps[i] - stamps[i - 1]) print ('Total of this many anoto events: {}' .format(len(self.anoto_timing))) print ('Number of all deltas {}'.format(len(deltas))) deltas = [d for d in deltas if d > 0.009] print ('Number of filtered deltas {}'.format(len(deltas))) # filter out pen ups deltas = [d for d in deltas if d < 0.5] freq = 1 / np.average(deltas) print ('Anoto sample frequency was: {0:.2f} Hz' .format(freq)) print ('StdDev of delays is {0:.3f} seconds' .format(np.std(deltas))) self.anoto_timing = []
def test_basic(self): y1 = np.array([1, 2, 3]) assert_(average(y1, axis=0) == 2.) y2 = np.array([1., 2., 3.]) assert_(average(y2, axis=0) == 2.) y3 = [0., 0., 0.] assert_(average(y3, axis=0) == 0.) y4 = np.ones((4, 4)) y4[0, 1] = 0 y4[1, 0] = 2 assert_almost_equal(y4.mean(0), average(y4, 0)) assert_almost_equal(y4.mean(1), average(y4, 1)) y5 = rand(5, 5) assert_almost_equal(y5.mean(0), average(y5, 0)) assert_almost_equal(y5.mean(1), average(y5, 1)) y6 = np.matrix(rand(5, 5)) assert_array_equal(y6.mean(0), average(y6, 0))
def test_returned(self): y = np.array([[1, 2, 3], [4, 5, 6]]) # No weights avg, scl = average(y, returned=True) assert_equal(scl, 6.) avg, scl = average(y, 0, returned=True) assert_array_equal(scl, np.array([2., 2., 2.])) avg, scl = average(y, 1, returned=True) assert_array_equal(scl, np.array([3., 3.])) # With weights w0 = [1, 2] avg, scl = average(y, weights=w0, axis=0, returned=True) assert_array_equal(scl, np.array([3., 3., 3.])) w1 = [1, 2, 3] avg, scl = average(y, weights=w1, axis=1, returned=True) assert_array_equal(scl, np.array([6., 6.])) w2 = [[0, 0, 1], [1, 2, 3]] avg, scl = average(y, weights=w2, axis=1, returned=True) assert_array_equal(scl, np.array([1., 6.]))
def test_testAverage1(self): # Test of average. ott = array([0., 1., 2., 3.], mask=[True, False, False, False]) assert_equal(2.0, average(ott, axis=0)) assert_equal(2.0, average(ott, weights=[1., 1., 2., 1.])) result, wts = average(ott, weights=[1., 1., 2., 1.], returned=1) assert_equal(2.0, result) self.assertTrue(wts == 4.0) ott[:] = masked assert_equal(average(ott, axis=0).mask, [True]) ott = array([0., 1., 2., 3.], mask=[True, False, False, False]) ott = ott.reshape(2, 2) ott[:, 1] = masked assert_equal(average(ott, axis=0), [2.0, 0.0]) assert_equal(average(ott, axis=1).mask[0], [True]) assert_equal([2., 0.], average(ott, axis=0)) result, wts = average(ott, axis=0, returned=1) assert_equal(wts, [1., 0.])
def adaboost(x, y, n): g_stp = [] u = [1/n]*n T = 15 for t in range(T): print(u) s, i, theta, errs = dsa(x, y, n, u) epsilon = np.average(errs, weights=u) / sum(u) scale = np.sqrt((1 - epsilon) / epsilon) for u_i in range(len(u)): if errs[u_i]: u[u_i] = u[u_i] * scale else: u[u_i] = u[u_i] / scale alpha = np.log(scale) g_stp.append((s, i, theta, alpha)) return g_stp
def calc_coordinate_averages(coord_arrays): """ Calculate average of all coordinate touples for the correct color parameter: dictionary with color key and value as array of coords in (y,x) returns: dictionary with color key and value as array of coords in (x,y)!!! """ # TODO: Sort out all circles not matching specific pixel range coords = {} for key, array in coord_arrays.items(): temp = numpy.average(array, axis=0) coords[key] = (int(temp[1]), int(temp[0])) return coords ######################################################################################################### #########################################################################################################
def compute_mean_ci(interp_sens, confidence = 0.95): sens_mean = np.zeros((interp_sens.shape[1]),dtype = 'float32') sens_lb = np.zeros((interp_sens.shape[1]),dtype = 'float32') sens_up = np.zeros((interp_sens.shape[1]),dtype = 'float32') Pz = (1.0-confidence)/2.0 for i in range(interp_sens.shape[1]): # get sorted vector vec = interp_sens[:,i] vec.sort() sens_mean[i] = np.average(vec) sens_lb[i] = vec[int(math.floor(Pz*len(vec)))] sens_up[i] = vec[int(math.floor((1.0-Pz)*len(vec)))] return sens_mean,sens_lb,sens_up
def analyze(self): self.neighborgrid() # just looking at up and left to avoid needless doubel calculations slopes=np.concatenate((np.abs(self.left - self.center),np.abs(self.up - self.center))) return '\n'.join(["%-15s: %.3f"%t for t in [ ('height average', np.average(self.center)), ('height median', np.median(self.center)), ('height max', np.max(self.center)), ('height min', np.min(self.center)), ('height std', np.std(self.center)), ('slope average', np.average(slopes)), ('slope median', np.median(slopes)), ('slope max', np.max(slopes)), ('slope min', np.min(slopes)), ('slope std', np.std(slopes)) ]] )
def write(self, global_step): batch_sizes = np.array(self.batch_sizes) fetches = [] feed_dict = {} summary_values = {} for name, values in self.batch_values.items(): summary_runner = self.manager.get_summary_runner(name) epoch_value = np.average(values, weights=batch_sizes) fetches.append(summary_runner.summary) feed_dict[summary_runner.placeholder] = epoch_value summary_values[name] = epoch_value epoch_summaries = self.manager.sess.run(fetches, feed_dict=feed_dict) for epoch_summary in epoch_summaries: self.writer.add_summary(epoch_summary, global_step) self.writer.flush() self.reset() return summary_values
def drawGraph( self ): graph = gui.Surface( ( 300, 200 ) ) graph.set_colorkey( ( 0, 0, 0 ) ) gens = len( self.genScores ) for genome in range( 40 ): pointlist = [ ( 0, 200 ) ] for generation in range( gens ): x = int( 300 * ( generation+1 ) / gens ) y = 200 - int( 200 * self.genScores[ generation ][ genome ] / self.highScore ) pointlist.append( ( x, y ) ) if genome in [ 0, 19, 39 ]: gui.draw.lines( graph, ( 0, 0, 255 ), False, pointlist, 2 ) else: gui.draw.lines( graph, ( 112, 108, 90 ), False, pointlist, 1 ) pointlist = [ ( 0, 200 ) ] for generation in range( gens ): x = int( 300 * ( generation+1 ) / gens ) y = 200 - int( 200 * np.average( self.genScores[ generation ] ) / self.highScore ) pointlist.append( ( x, y ) ) gui.draw.lines( graph, ( 255, 0, 0 ), False, pointlist, 2 ) self.lastGraph = graph
def integrate_blindly(self, target_time, step=None): """ Like `jitcdde`’s `integrate_blindly`, except for orthonormalising the separation functions after each step and the output being analogous to `jitcdde_lyap`’s `integrate`. """ dt,number,total_integration_time = self._prepare_blind_int(target_time, step) instantaneous_lyaps = [] for _ in range(number): self.DDE.get_next_step(dt) self.DDE.accept_step() self.DDE.forget(self.max_delay) norms = self.DDE.orthonormalise(self._n_lyap, self.max_delay) instantaneous_lyaps.append(np.log(norms)/dt) lyaps = np.average(instantaneous_lyaps, axis=0) return self.DDE.get_current_state()[:self.n_basic], lyaps, total_integration_time
def integrate_blindly(self, target_time, step=None): """ Like `jitcdde`’s `integrate_blindly`, except for normalising and aligning the separation function after each step and the output being analogous to `jitcdde_restricted_lyap`’s `integrate`. """ dt,number,total_integration_time = self._prepare_blind_int(target_time, step) instantaneous_lyaps = [] for _ in range(number): self.DDE.get_next_step(dt) self.DDE.accept_step() self.DDE.forget(self.max_delay) norm = self.remove_projections() instantaneous_lyaps.append(np.log(norm)/dt) lyap = np.average(instantaneous_lyaps) state = self.DDE.get_current_state()[:self.n_basic] return state, lyap, total_integration_time
def integrate_blindly(self, target_time, step=None): """ Like `jitcdde`’s `integrate_blindly`, except for normalising and aligning the separation function after each step and the output being analogous to `jitcdde_transversal_lyap`’s `integrate`. """ dt,number,total_integration_time = self._prepare_blind_int(target_time, step) instantaneous_lyaps = [] for _ in range(number): self.DDE.get_next_step(dt) self.DDE.accept_step() self.DDE.forget(self.max_delay) norm = self.DDE.normalise_indices(self.max_delay) instantaneous_lyaps.append(np.log(norm)/dt) lyap = np.average(instantaneous_lyaps) state = self.DDE.get_current_state()[self.G.main_indices] return state, lyap, total_integration_time
def weighted_avg_and_std(values, weights=None): ''' Return the weighted average and standard deviation. `values` - np.ndarray of values to average. `weights` - Optional np.ndarray of weights. Otherwise all values are assumed equally weighted. Note the helpful np.fromiter() function, helpful building arrays. ''' if not isinstance(values, np.ndarray): raise TypeError("Values must be an np.array") if len(values) == 0: raise ValueError("Can't calculate with no values") if weights is not None: if not isinstance(weights, np.ndarray): raise TypeError("Weights must be None or an np.array") if len(values) != len(weights): raise ValueError("Length of values and weights differ") average = np.average(values, weights=weights) variance = np.average((values-average)**2, weights=weights) # Fast and numerically precise return (average, math.sqrt(variance))
def printAnalysisStats(self): print("Analysis of %d metric datapoints" % len(self._analysis_metrics)) trade_mean, trade_std = weighted_avg_and_std(self._trade_prices, self._trade_volumes) print("Weighted mean trade price: %.2f" % trade_mean) print("Weighted STD trade price: %.2f" % trade_std) mean = np.average(list(map(abs, self._analysis_metrics))) print("Metric mean of abs: %s" % mean) std = np.std(self._analysis_metrics) print("Metric standard deviation: %f" % std) n_sigma = 2 faktor = 5 / (n_sigma * std) print("Best quantization factor: %f" % faktor) mean, sd = weighted_avg_and_std(self._trade_intervals) print("Mean gap between trades: %.2fs" % mean) print("SD of trade gaps: %.2fs" % sd)
def get_similarity_scores(verb_token, vectorizer, tf_idf_matrix): """ Compute the cosine similarity score of a given verb token against the input corpus TF/IDF matrix. :param str verb_token: Surface form of a verb, e.g., *born* :param sklearn.feature_extraction.text.TfidfVectorizer vectorizer: Vectorizer used to transform verbs into vectors :return: cosine similarity score :rtype: ndarray """ verb_token_vector = vectorizer.transform([verb_token]) # Here the linear kernel is the same as the cosine similarity, but faster # cf. http://scikit-learn.org/stable/modules/metrics.html#cosine-similarity scores = linear_kernel(verb_token_vector, tf_idf_matrix) logger.debug("Corpus-wide TF/IDF scores for '%s': %s" % (verb_token, scores)) logger.debug("Average TF/IDF score for '%s': %f" % (verb_token, average(scores))) return scores
def ndcg(self, rankers, cutoff): ''' rankers: instances of Ranker cutoff: cutoff for nDCG ''' result = defaultdict(list) for q in self.docs: documents = self.docs[q] rels = {id(d): d.rel for d in documents} for idx, ranker in enumerate(rankers): res = ranker.rank(documents) ranked_list = [id(d) for d in res] score = ndcg(ranked_list, rels, cutoff) result[idx].append(score) for idx in result: result[idx] = np.average(result[idx]) return result
def correlations(A,B,pc_n=100): p = (1 - distance.correlation(A.flatten(),B.flatten())) spear = spearmanr(A.flatten(),B.flatten()) dist_genes = np.zeros(A.shape[0]) for i in range(A.shape[0]): dist_genes[i] = 1 - distance.correlation(A[i],B[i]) pg = (np.average(dist_genes[np.isfinite(dist_genes)])) dist_sample = np.zeros(A.shape[1]) for i in range(A.shape[1]): dist_sample[i] = 1 - distance.correlation(A[:,i],B[:,i]) ps = (np.average(dist_sample[np.isfinite(dist_sample)])) pc_dist = [] if pc_n > 0: u0,s0,vt0 = np.linalg.svd(A) u,s,vt = np.linalg.svd(B) for i in range(pc_n): pc_dist.append(abs(1 - distance.cosine(u0[:,i],u[:,i]))) pc_dist = np.array(pc_dist) return p,spear[0],pg,ps,pc_dist
def record_tabular_misc_stat(key, values, placement='back'): if placement == 'front': prefix = "" suffix = key else: prefix = key suffix = "" if len(values) > 0: record_tabular(prefix + "Average" + suffix, np.average(values)) record_tabular(prefix + "Std" + suffix, np.std(values)) record_tabular(prefix + "Median" + suffix, np.median(values)) record_tabular(prefix + "Min" + suffix, np.min(values)) record_tabular(prefix + "Max" + suffix, np.max(values)) else: record_tabular(prefix + "Average" + suffix, np.nan) record_tabular(prefix + "Std" + suffix, np.nan) record_tabular(prefix + "Median" + suffix, np.nan) record_tabular(prefix + "Min" + suffix, np.nan) record_tabular(prefix + "Max" + suffix, np.nan)
def step(self): """ Half of the step of k-means """ if self.step_completed: d = self.data.X points = [d[self.clusters == i] for i in range(len(self.centroids))] for i in range(len(self.centroids)): c_points = points[i] self.centroids[i, :] = (np.average(c_points, axis=0) if len(c_points) > 0 else np.nan) # reinitialize empty centroids nan_c = np.isnan(self.centroids).any(axis=1) if np.count_nonzero(nan_c) > 0: self.centroids[nan_c] = self.random_positioning( np.count_nonzero(nan_c)) self.centroids_moved = True else: self.clusters = self.find_clusters(self.centroids) self.centroids_moved = False self.step_no += 1 self.centroids_history = self.set_list( self.centroids_history, self.step_no, np.copy(self.centroids))
def solint_numpy_indexing(dsref): start = time.time() dsref.as_numarray() tms = numpy.unique(dsref.x) # check if there is something to be averaged at all if len(tms)==len(dsref.x): return time.time() - start newds = dataset() for tm in tms: newds.append(tm, numpy.average(dsref.y[numpy.where(dsref.x==tm)]) ) dsref.x = newds.x dsref.y = newds.y return time.time() - start
def solint_pure_python3(dsref): start = time.time() tms = set(dsref.x) # check if there is something to be averaged at all if len(tms)==len(dsref.x): return time.time() - start # accumulate data into bins of the same time r = reduce(lambda acc, (tm, y): acc[tm].add(y) or acc, \ itertools.izip(dsref.x, dsref.y), \ collections.defaultdict(average)) # do the averaging (x, y) = reduce(lambda (xl, yl), (tm, ys): (xl.append(tm) or xl, yl.append(ys.avg()) or yl), \ r.iteritems(), (list(), list())) dsref.x = x dsref.y = y return time.time() - start
def markdown_overall_speedups(_type, _timing, r_benchmarks): txt_geomean = ' Geometeric mean :: ' txt_avg = ' Average :: ' txt_max = ' Maximum :: ' for _interp in r_benchmarks: txt_geomean += _interp + ': `' + ("%.3f" % geomean(r_benchmarks[_interp]) ) + 'x`, ' txt_avg += _interp + ': `' + ("%.3f" % np.average(r_benchmarks[_interp])) + 'x`, ' txt_max += _interp + ': `' + ("%.3f" % max(r_benchmarks[_interp]) ) + 'x`, ' if _interp not in benchmarks_stats_overall: benchmarks_stats_overall[_interp] = {} if _timing not in benchmarks_stats_overall[_interp]: benchmarks_stats_overall[_interp][_timing] = [] benchmarks_stats_overall[_interp][_timing] += r_benchmarks[_interp] txt_geomean += '\n\n' txt_avg += '\n\n' txt_max += '\n\n' if _type not in benchmarks_stats_types: benchmarks_stats_types[_type] = {} benchmarks_stats_types[_type][_timing] = [txt_geomean, txt_avg, txt_max]
def calculate_hit_at_one(predictions, actuals): """Performs a local (numpy) calculation of the hit at one. Args: predictions: Matrix containing the outputs of the model. Dimensions are 'batch' x 'num_classes'. actuals: Matrix containing the ground truth labels. Dimensions are 'batch' x 'num_classes'. Returns: float: The average hit at one across the entire batch. """ top_prediction = numpy.argmax(predictions, 1) hits = actuals[numpy.arange(actuals.shape[0]), top_prediction] return numpy.average(hits)
def calculate_precision_at_equal_recall_rate(predictions, actuals): """Performs a local (numpy) calculation of the PERR. Args: predictions: Matrix containing the outputs of the model. Dimensions are 'batch' x 'num_classes'. actuals: Matrix containing the ground truth labels. Dimensions are 'batch' x 'num_classes'. Returns: float: The average precision at equal recall rate across the entire batch. """ aggregated_precision = 0.0 num_videos = actuals.shape[0] for row in numpy.arange(num_videos): num_labels = int(numpy.sum(actuals[row])) top_indices = numpy.argpartition(predictions[row], -num_labels)[-num_labels:] item_precision = 0.0 for label_index in top_indices: if predictions[row][label_index] > 0: item_precision += actuals[row][label_index] item_precision /= top_indices.size aggregated_precision += item_precision aggregated_precision /= num_videos return aggregated_precision
def get_regression_data(name, split, data_path=data_path): path = '{}{}.csv'.format(data_path, name) if not os.path.isfile(path): download(name +'.csv', data_path=data_path) data = pandas.read_csv(path, header=None).values if name in ['energy', 'naval']: # there are two Ys for these, but take only the first X_full = data[:, :-2] Y_full = data[:, -2] else: X_full = data[:, :-1] Y_full = data[:, -1] X, Y, Xs, Ys = make_split(X_full, Y_full, split) ############# whiten inputs X_mean, X_std = np.average(X, 0), np.std(X, 0)+1e-6 X = (X - X_mean)/X_std Xs = (Xs - X_mean)/X_std return X, Y[:, None], Xs, Ys[:, None]
def test_weighted_average(self): """ Test results of weighted average against numpy.average """ stream = [np.random.random(size = (16,16)) for _ in range(5)] with self.subTest('float weights'): weights = [random() for _ in stream] from_iaverage = last(iaverage(stream, weights = weights)) from_numpy = np.average(np.dstack(stream), axis = 2, weights = np.array(weights)) self.assertTrue(np.allclose(from_iaverage, from_numpy)) with self.subTest('array weights'): weights = [np.random.random(size = stream[0].shape) for _ in stream] from_iaverage = last(iaverage(stream, weights = weights)) from_numpy = np.average(np.dstack(stream), axis = 2, weights = np.dstack(weights)) self.assertTrue(np.allclose(from_iaverage, from_numpy))