我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用numpy.cov()。
def _init_coefs(X, method='corrcoef'): if method == 'corrcoef': return np.corrcoef(X, rowvar=False), 1.0 elif method == 'cov': init_cov = np.cov(X, rowvar=False) return init_cov, np.max(np.abs(np.triu(init_cov))) elif method == 'spearman': return spearman_correlation(X, rowvar=False), 1.0 elif method == 'kendalltau': return kendalltau_correlation(X, rowvar=False), 1.0 elif callable(method): return method(X) else: raise ValueError( ("initialize_method must be 'corrcoef' or 'cov', " "passed \'{}\' .".format(method)) )
def PCA(data, num_components=None): # mean center the data data -= data.mean(axis=0) # calculate the covariance matrix R = np.cov(data, rowvar=False) # calculate eigenvectors & eigenvalues of the covariance matrix # use 'eigh' rather than 'eig' since R is symmetric, # the performance gain is substantial V, E = np.linalg.eigh(R) # sort eigenvalue in decreasing order idx = np.argsort(V)[::-1] E = E[:,idx] # sort eigenvectors according to same index V = V[idx] # select the first n eigenvectors (n is desired dimension # of rescaled data array, or dims_rescaled_data) E = E[:, :num_components] # carry out the transformation on the data using eigenvectors # and return the re-scaled data, eigenvalues, and eigenvectors return np.dot(E.T, data.T).T, V, E
def calculate_beta(self): """ .. math:: \\beta_a = \\frac{\mathrm{Cov}(r_a,r_p)}{\mathrm{Var}(r_p)} http://en.wikipedia.org/wiki/Beta_(finance) """ # it doesn't make much sense to calculate beta for less than two # values, so return none. if len(self.algorithm_returns) < 2: return 0.0 returns_matrix = np.vstack([self.algorithm_returns, self.benchmark_returns]) C = np.cov(returns_matrix, ddof=1) algorithm_covariance = C[0][1] benchmark_variance = C[1][1] beta = algorithm_covariance / benchmark_variance return beta
def stats2(sarray, names=None): """Calculate means and (co)variances for structured array data.""" if names is None: names = sarray.dtype.names nvar = len(names) data = tuple(sarray[name] for name in names) cov = np.cov(data) nondiag_cov = list(cov[i, j] for i, j in permutations(range(nvar), 2)) names_ave = list('ave_' + name for name in names) names_var = list('var_' + name for name in names) names_cov = list( 'cov_' + n1 + "_" + n2 for n1, n2 in permutations(names, 2)) out = dict(zip(names_ave, np.mean(data, axis=1))) out.update(zip(names_var, cov.diagonal())) out.update(zip(names_cov, nondiag_cov)) NamedStats = namedtuple('Stats2', names_ave + names_var + names_cov) return NamedStats(**out)
def eval_F(list_data, worker, mu, C, W): """ F = log N(T, mu, C) + SUM q() log Ber(L_ij | T) list_il can be from different datasets: list_il = list: each elem is a list from a dataset """ res = scipy.stats.multivariate_normal.logpdf(W, mean = mu, cov = C, allow_singular = True) for dt, data in enumerate(list_data): # dataset dt: sen = W[2*dt], fpr = W[2*dt+1] sen, fpr = W[2*dt], W[2*dt+1] if worker in data.dic_wl: for (item, label) in data.dic_wl[worker]: # prob of being 1 qz = data.qz[item] res += qz * np.log( Ber(S(sen), label) ) #res += qz * ( label*np.log(S(sen)) + (1-label)*np.log(S(sen)) ) res += (1-qz) * np.log( Ber(S(fpr), label) ) return res
def expectation_binorm(rv, mu, var, x, M, C, w = 3): """ Evaluate the expectation of log Norm(uv| M, C) x = u, v ~ Norm(mu, var) rv == 'v' x = v, u ~ Norm(mu, var) rv == 'u' """ #print rv, mu, var, x, M, C if rv == 'v': f = lambda v: scipy.stats.norm.pdf(v, loc = mu, scale = np.sqrt(var) ) * \ np.log(scipy.stats.multivariate_normal.pdf([x, v], mean = M, cov = C, allow_singular = True)) else: f = lambda u: scipy.stats.norm.pdf(u, loc = mu, scale = np.sqrt(var) ) * \ np.log(scipy.stats.multivariate_normal.pdf([u, x], mean = M, cov = C, allow_singular = True)) #return f #print f(mu) std = np.sqrt(var) return scipy.integrate.quad(f, mu - w*std, mu + w*std)[0]
def measure_correlation(gold0, gold1, l = 10, pos = 0, list_w = []): a = [] b = [] for w in gold0.keys(): if w in gold1 and (w in list_w or list_w == []): if gold0[w][0] != None and gold1[w][0] != None: if gold0[w][2] > l and gold1[w][2] > l: if pos == 0: a.append(logit(gold0[w][pos])) b.append(logit(gold1[w][pos])) else: a.append(logit(1 - gold0[w][pos])) b.append(logit(1 - gold1[w][pos])) print len(a), np.mean(a), np.mean(b) return np.cov(a,b)
def mahalanobis_distance(difference, num_random_features): num_samples, _ = np.shape(difference) sigma = np.cov(np.transpose(difference)) mu = np.mean(difference, 0) if num_random_features == 1: stat = float(num_samples * mu ** 2) / float(sigma) else: try: linalg.inv(sigma) except LinAlgError: print('covariance matrix is singular. Pvalue returned is 1.1') warnings.warn('covariance matrix is singular. Pvalue returned is 1.1') return 0 stat = num_samples * mu.dot(linalg.solve(sigma, np.transpose(mu))) return chi2.sf(stat, num_random_features)
def heritability(parents, offspring): """ Compute the heritability from parent and offspring samples. Parameters ---------- parents : array_like Array of data for trait of parents. offspring : array_like Array of data for trait of offspring. Returns ------- output : float Heritability of trait. """ par, off = _convert_two_data(parents, offspring) covariance_matrix = np.cov(par, off) return covariance_matrix[0,1] / covariance_matrix[0,0]
def test_1d_w_missing(self): # Test cov 1 1D variable w/missing values x = self.data x[-1] = masked x -= x.mean() nx = x.compressed() assert_almost_equal(np.cov(nx), cov(x)) assert_almost_equal(np.cov(nx, rowvar=False), cov(x, rowvar=False)) assert_almost_equal(np.cov(nx, rowvar=False, bias=True), cov(x, rowvar=False, bias=True)) # try: cov(x, allow_masked=False) except ValueError: pass # # 2 1D variables w/ missing values nx = x[1:-1] assert_almost_equal(np.cov(nx, nx[::-1]), cov(x, x[::-1])) assert_almost_equal(np.cov(nx, nx[::-1], rowvar=False), cov(x, x[::-1], rowvar=False)) assert_almost_equal(np.cov(nx, nx[::-1], rowvar=False, bias=True), cov(x, x[::-1], rowvar=False, bias=True))
def test_2d_w_missing(self): # Test cov on 2D variable w/ missing value x = self.data x[-1] = masked x = x.reshape(3, 4) valid = np.logical_not(getmaskarray(x)).astype(int) frac = np.dot(valid, valid.T) xf = (x - x.mean(1)[:, None]).filled(0) assert_almost_equal(cov(x), np.cov(xf) * (x.shape[1] - 1) / (frac - 1.)) assert_almost_equal(cov(x, bias=True), np.cov(xf, bias=True) * x.shape[1] / frac) frac = np.dot(valid.T, valid) xf = (x - x.mean(0)).filled(0) assert_almost_equal(cov(x, rowvar=False), (np.cov(xf, rowvar=False) * (x.shape[0] - 1) / (frac - 1.))) assert_almost_equal(cov(x, rowvar=False, bias=True), (np.cov(xf, rowvar=False, bias=True) * x.shape[0] / frac))
def test_shape_inference(self): with self.test_session(use_gpu=True): # Static mean = 10 * np.random.normal(size=(10, 11, 2)).astype('d') cov = np.zeros((10, 11, 2, 2)) dst = MultivariateNormalCholesky( tf.constant(mean), tf.constant(cov)) self.assertEqual(dst.get_batch_shape().as_list(), [10, 11]) self.assertEqual(dst.get_value_shape().as_list(), [2]) # Dynamic unk_mean = tf.placeholder(tf.float32, None) unk_cov = tf.placeholder(tf.float32, None) dst = MultivariateNormalCholesky(unk_mean, unk_cov) self.assertEqual(dst.get_value_shape().as_list(), [None]) feed_dict = {unk_mean: np.ones(2), unk_cov: np.eye(2)} self.assertEqual(list(dst.batch_shape.eval(feed_dict)), []) self.assertEqual(list(dst.value_shape.eval(feed_dict)), [2])
def test_sample(self): with self.fixed_randomness_session(233): def test_sample_with(seed): mean, cov, cov_chol = self._gen_test_params(seed) dst = MultivariateNormalCholesky( tf.constant(mean), tf.constant(cov_chol)) n_exp = 20000 samples = dst.sample(n_exp) sample_shape = (n_exp, 10, 11, 3) self.assertEqual(samples.shape.as_list(), list(sample_shape)) samples = dst.sample(n_exp).eval() self.assertEqual(samples.shape, sample_shape) self.assertAllClose( np.mean(samples, axis=0), mean, rtol=5e-2, atol=5e-2) for i in range(10): for j in range(11): self.assertAllClose( np.cov(samples[:, i, j, :].T), cov[i, j], rtol=1e-1, atol=1e-1) for seed in [23, 233, 2333]: test_sample_with(seed)
def test_prob(self): with self.fixed_randomness_session(233): def test_prob_with(seed): mean, cov, cov_chol = self._gen_test_params(seed) dst = MultivariateNormalCholesky( tf.constant(mean), tf.constant(cov_chol), check_numerics=True) n_exp = 200 samples = dst.sample(n_exp).eval() log_pdf = dst.log_prob(tf.constant(samples)) pdf_shape = (n_exp, 10, 11) self.assertEqual(log_pdf.shape.as_list(), list(pdf_shape)) log_pdf = log_pdf.eval() self.assertEqual(log_pdf.shape, pdf_shape) for i in range(10): for j in range(11): log_pdf_exact = stats.multivariate_normal.logpdf( samples[:, i, j, :], mean[i, j], cov[i, j]) self.assertAllClose( log_pdf_exact, log_pdf[:, i, j]) self.assertAllClose( np.exp(log_pdf), dst.prob(tf.constant(samples)).eval()) for seed in [23, 233, 2333]: test_prob_with(seed)
def get_neg_log_post(Phi, sigma_J_list, ROI_list, G, MMT, q, Sigma_E, GL, nu, V, prior_on = False): eps = 1E-13 p = Phi.shape[0] n_ROI = len(sigma_J_list) Qu = Phi.dot(Phi.T) G_Sigma_G = np.zeros(MMT.shape) for i in range(n_ROI): G_Sigma_G += sigma_J_list[i]**2 * np.dot(G[:,ROI_list[i]], G[:,ROI_list[i]].T) cov = Sigma_E + G_Sigma_G + GL.dot(Qu).dot(GL.T) inv_cov = np.linalg.inv(cov) eigs = np.real(np.linalg.eigvals(cov)) + eps log_det_cov = np.sum(np.log(eigs)) result = q*log_det_cov + np.trace(MMT.dot(inv_cov)) if prior_on: inv_Q = np.linalg.inv(Qu) #det_Q = np.linalg.det(Qu) log_det_Q = np.sum(np.log(np.diag(Phi)**2)) result = result + np.float(nu+p+1)*log_det_Q+ np.trace(V.dot(inv_Q)) return result #============================================================================== # update both Qu and Sigma_J, gradient of Qu and Sigma J
def plot(self, data, size, newdata=None): data = np.array(data) numsample = len(data) colmean = np.mean(data, axis=0) matcov = np.cov(data.T) matinv = np.linalg.inv(matcov) values = [] for sample in data: dif = sample - colmean value = matinv.dot(dif.T).dot(dif) values.append(value) cl = ((numsample - 1)**2) / numsample lcl = cl * beta.ppf(0.00135, size / 2, (numsample - size - 1) / 2) center = cl * beta.ppf(0.5, size / 2, (numsample - size - 1) / 2) ucl = cl * beta.ppf(0.99865, size / 2, (numsample - size - 1) / 2) return (values, center, lcl, ucl, self._title)
def calc_mean_var_loss(epochsInds,loss_train): #Loss train is in dimension # epochs X #batchs num_of_epochs = loss_train.shape[0] #Average over the batchs loss_train_mean = np.mean(loss_train,1) #The diff divided by the sampled indexes d_mean_loss_to_dt = np.sqrt(np.abs(np.diff(loss_train_mean) / np.diff(epochsInds[:]))) var_loss = [] #Go over the epochs for epoch_index in range(num_of_epochs): #The loss for the specpic epoch current_loss = loss_train[epoch_index, :] #The derivative between the batchs current_loss_dt = np.diff(current_loss) #The mean of his derivative average_loss = np.mean(current_loss_dt) current_loss_minus_mean = current_loss_dt- average_loss #The covarince between the batchs cov_mat = np.dot(current_loss_minus_mean[:, None], current_loss_minus_mean[None, :]) # The trace of the cov matrix trac_cov = np.trace(cov_mat) var_loss.append(trac_cov) return np.array(var_loss), d_mean_loss_to_dt
def match_color_histogram(x, y): z = np.zeros_like(x) shape = x[0].shape for i in six.moves.range(len(x)): a = x[i].reshape((3, -1)) a_mean = np.mean(a, axis=1, keepdims=True) a_var = np.cov(a) d, v = np.linalg.eig(a_var) d += 1e-6 a_sigma_inv = v.dot(np.diag(d ** (-0.5))).dot(v.T) b = y[i].reshape((3, -1)) b_mean = np.mean(b, axis=1, keepdims=True) b_var = np.cov(b) d, v = np.linalg.eig(b_var) b_sigma = v.dot(np.diag(d ** 0.5)).dot(v.T) transform = b_sigma.dot(a_sigma_inv) z[i,:] = (transform.dot(a - a_mean) + b_mean).reshape(shape) return z
def compute_cmus_scov(self, data, labels): """ Compute class means and shared covariance (weighted within-class covariance) """ self.label_map, class_sizes = np.unique(labels, return_counts=True) dim = data.shape[1] cmus = np.zeros(shape=(len(class_sizes), dim)) acc = np.zeros(shape=(dim, dim)) scov = np.zeros_like(acc) gl_mu = np.mean(data, axis=0).reshape(-1, 1) gl_cov = np.cov(data.T, bias=True) for i, k in enumerate(self.label_map): data_k = data[np.where(labels == k)[0], :] mu_k = np.mean(data_k, axis=0).reshape(-1, 1) cmus[i, :] = mu_k[:, 0] acc += (gl_mu - mu_k).dot((gl_mu - mu_k).T) * data_k.shape[0] acc /= data.shape[0] scov = gl_cov - acc return cmus, scov, class_sizes
def get_major_minor_axis(img): """ Finds the major and minor axis as 3d vectors of the passed in image :param img: CZYX numpy array :return: tuple containing two numpy arrays representing the major and minor axis as 3d vectors """ # do a mean projection if more than 3 axes if img.ndim > 3: z, y, x = np.nonzero(np.mean(img, axis=tuple(range(img.ndim - 3)))) else: z, y, x = np.nonzero(img) coords = np.stack([x - np.mean(x), y - np.mean(y), z - np.mean(z)]) # eigenvectors and values of the covariance matrix evals, evecs = np.linalg.eig(np.cov(coords)) # return largest and smallest eigenvectors (major and minor axis) order = np.argsort(evals) return (evecs[:, order[-1]], evecs[:, order[0]])
def posterior_cov(self): """ Computes posterior covariance from the samples drawn from posterior distribution Returns ------- np.ndarray posterior covariance """ endp = len(self.parameters) - 1 endw = len(self.weights) - 1 params = self.parameters[endp] weights = self.weights[endw] return np.cov(np.transpose(params), aweights = weights.reshape(len(weights),))
def __init__(self, score_metric='log_likelihood', init_method='cov', auto_scale=True): self.score_metric = score_metric self.init_method = init_method self.auto_scale = auto_scale self.covariance_ = None # assumes a matrix of a list of matrices self.precision_ = None # assumes a matrix of a list of matrices # these must be updated upon self.fit() # the first 4 will be set if self.init_coefs is used. # self.sample_covariance_ # self.lam_scale_ # self.n_samples # self.n_features self.is_fitted = False super(InverseCovarianceEstimator, self).__init__()
def _maximization(self): # Maximize priors priors = sum(self.responsibility_matrix) priors = [float(i)/sum(priors) for i in priors] # Maximize means mus = [0 for i in range(self.c)] for k in range(self.c): mus_k = sum(np.multiply(self.samples, self.responsibility_matrix[:, k][:, np.newaxis])) normalized_mus_k = mus_k / sum(self.responsibility_matrix[:, k]) mus[k] = normalized_mus_k # Maximize covariances covs = [0 for i in range(self.c)] for k in range(self.c): covs[k] = np.cov(self.samples.T, aweights=self.responsibility_matrix[:, k]) return priors, mus, covs
def test_plot_error_ellipse(self): # Generate random data x = np.random.normal(0, 1, 300) s = np.array([2.0, 2.0]) y1 = np.random.normal(s[0] * x) y2 = np.random.normal(s[1] * x) data = np.array([y1, y2]) # Calculate covariance and plot error ellipse cov = np.cov(data) plot_error_ellipse([0.0, 0.0], cov) debug = False if debug: plt.scatter(data[0, :], data[1, :]) plt.xlim([-8, 8]) plt.ylim([-8, 8]) plt.show() plt.clf()
def test_mean_cov(self): with self.test_context(): self.m.compile() num_samples = 1000 samples = self.hmc.sample(self.m, num_samples=num_samples, lmin=10, lmax=21, epsilon=0.05) self.assertEqual(samples.shape, (num_samples, 2)) xs = np.array(samples[self.m.x.full_name].tolist(), dtype=np.float32) mean = xs.mean(0) cov = np.cov(xs.T) cov_standard = np.eye(cov.shape[0]) # TODO(@awav): inspite of the fact that we set up graph's random seed, # the operation seed is still assigned by tensorflow automatically # and hence sample output numbers are not deterministic. # # self.assertTrue(np.sum(np.abs(mean) < 0.1) >= mean.size/2) # assert_allclose(cov, cov_standard, rtol=1e-1, atol=1e-1)
def _init(self, X, lengths=None): super(GaussianHMM, self)._init(X, lengths=lengths) _, n_features = X.shape if hasattr(self, 'n_features') and self.n_features != n_features: raise ValueError('Unexpected number of dimensions, got %s but ' 'expected %s' % (n_features, self.n_features)) self.n_features = n_features if 'm' in self.init_params or not hasattr(self, "means_"): kmeans = cluster.KMeans(n_clusters=self.n_components, random_state=self.random_state) kmeans.fit(X) self.means_ = kmeans.cluster_centers_ if 'c' in self.init_params or not hasattr(self, "covars_"): cv = np.cov(X.T) + self.min_covar * np.eye(X.shape[1]) if not cv.shape: cv.shape = (1, 1) self._covars_ = distribute_covar_matrix_to_match_covariance_type( cv, self.covariance_type, self.n_components).copy()
def _generate_sample_from_state(self, state, random_state=None): if random_state is None: random_state = self.random_state random_state = check_random_state(random_state) cur_means = self.means_[state] cur_covs = self.covars_[state] cur_weights = self.weights_[state] i_gauss = random_state.choice(self.n_mix, p=cur_weights) mean = cur_means[i_gauss] if self.covariance_type == 'tied': cov = cur_covs else: cov = cur_covs[i_gauss] return sample_gaussian(mean, cov, self.covariance_type, random_state=random_state)
def test_trace_sqrt_product_value(self): """Test that `trace_sqrt_product` gives the correct value.""" np.random.seed(0) # Make num_examples > num_features to ensure scipy's sqrtm function # doesn't return a complex matrix. test_pool_real_a = np.float32(np.random.randn(512, 256)) test_pool_gen_a = np.float32(np.random.randn(512, 256)) cov_real = np.cov(test_pool_real_a, rowvar=False) cov_gen = np.cov(test_pool_gen_a, rowvar=False) trace_sqrt_prod_op = _run_with_mock(gan_metrics.trace_sqrt_product, cov_real, cov_gen) with self.test_session() as sess: # trace_sqrt_product: tsp actual_tsp = sess.run(trace_sqrt_prod_op) expected_tsp = _expected_trace_sqrt_product(cov_real, cov_gen) self.assertAllClose(actual_tsp, expected_tsp, 0.01)
def calculate_residual_correlation_matrix(returns): # find the market return constraining on the selected companies (first PCA) # regress each stock on that and find correlation of residuals returns_matrix = returns.as_matrix().transpose() covar_matrix = np.cov(returns_matrix) pca = decomposition.PCA(n_components=1) pca.fit(covar_matrix) X = pca.transform(covar_matrix) regr = linear_model.LinearRegression() dim = covar_matrix.shape[1] res = np.zeros(shape=(dim,dim)) for x in range(0, dim): regr = linear_model.LinearRegression() regr = regr.fit(X, covar_matrix[:,x]) res[:,x] = covar_matrix[:,x] - regr.predict(X) res_corr = np.corrcoef(res) return pd.DataFrame(res_corr, index = returns.columns, columns = returns.columns)
def create_zca(imgs, filter_bias=0.1): meanX = np.mean(imgs, axis=0) covX = np.cov(imgs.T) D, E = np.linalg.eigh(covX + filter_bias * np.eye(covX.shape[0], covX.shape[1])) assert not np.isnan(D).any() assert not np.isnan(E).any() assert D.min() > 0 D **= -.5 W = np.dot(E, np.dot(np.diag(D), E.T)) def transform(images): return np.dot(images - meanX, W) return transform
def testValueTensorIsIdempotent(self): labels = tf.random_normal((10, 3), seed=2) predictions = labels * 0.5 + tf.random_normal((10, 3), seed=1) * 0.5 cov, update_op = metrics.streaming_covariance(predictions, labels) with self.test_session() as sess: sess.run(tf.initialize_local_variables()) # Run several updates. for _ in range(10): sess.run(update_op) # Then verify idempotency. initial_cov = cov.eval() for _ in range(10): self.assertEqual(initial_cov, cov.eval())
def testSingleUpdateWithErrorAndWeights(self): with self.test_session() as sess: predictions = np.array([2, 4, 6, 8]) labels = np.array([1, 3, 2, 7]) weights = np.array([0, 1, 3, 1]) predictions_t = tf.constant(predictions, shape=(1, 4), dtype=tf.float32) labels_t = tf.constant(labels, shape=(1, 4), dtype=tf.float32) weights_t = tf.constant(weights, shape=(1, 4), dtype=tf.float32) pearson_r, update_op = metrics.streaming_pearson_correlation( predictions_t, labels_t, weights=weights_t) p, l = _reweight(predictions, labels, weights) cmat = np.cov(p, l) expected_r = cmat[0, 1] / np.sqrt(cmat[0, 0] * cmat[1, 1]) sess.run(tf.initialize_local_variables()) self.assertAlmostEqual(expected_r, sess.run(update_op)) self.assertAlmostEqual(expected_r, pearson_r.eval())
def make_random_points(centers, num_points): num_centers, num_dims = centers.shape assignments = np.random.choice(num_centers, num_points) offsets = np.round(np.random.randn(num_points, num_dims).astype(np.float32) * 20) points = centers[assignments] + offsets means = [np.mean(points[assignments == center], axis=0) for center in xrange(num_centers)] covs = [np.cov(points[assignments == center].T) for center in xrange(num_centers)] scores = [] for r in xrange(num_points): scores.append(np.sqrt(np.dot( np.dot(points[r, :] - means[assignments[r]], np.linalg.inv(covs[assignments[r]])), points[r, :] - means[assignments[r]]))) return (points, assignments, scores)
def testValueTensorIsIdempotent(self): labels = tf.random_normal((10, 3), seed=2) predictions = labels * 0.5 + tf.random_normal((10, 3), seed=1) * 0.5 cov, update_op = metrics.streaming_covariance(predictions, labels) with self.test_session() as sess: sess.run(tf.local_variables_initializer()) # Run several updates. for _ in range(10): sess.run(update_op) # Then verify idempotency. initial_cov = cov.eval() for _ in range(10): self.assertEqual(initial_cov, cov.eval())
def testSingleUpdateWithErrorAndWeights(self): with self.test_session() as sess: predictions = np.array([2, 4, 6, 8]) labels = np.array([1, 3, 2, 7]) weights = np.array([0, 1, 3, 1]) predictions_t = tf.constant(predictions, shape=(1, 4), dtype=tf.float32) labels_t = tf.constant(labels, shape=(1, 4), dtype=tf.float32) weights_t = tf.constant(weights, shape=(1, 4), dtype=tf.float32) pearson_r, update_op = metrics.streaming_pearson_correlation( predictions_t, labels_t, weights=weights_t) p, l = _reweight(predictions, labels, weights) cmat = np.cov(p, l) expected_r = cmat[0, 1] / np.sqrt(cmat[0, 0] * cmat[1, 1]) sess.run(tf.local_variables_initializer()) self.assertAlmostEqual(expected_r, sess.run(update_op)) self.assertAlmostEqual(expected_r, pearson_r.eval())
def test_covariance(self): start_time = time.time() data = self.data.T np_cov = np.cov(data) logging.info('Numpy took %f', time.time() - start_time) start_time = time.time() with self.test_session() as sess: op = gmm_ops._covariance( tf.constant(data.T, dtype=tf.float32), False) op_diag = gmm_ops._covariance( tf.constant(data.T, dtype=tf.float32), True) tf.global_variables_initializer().run() tf_cov = sess.run(op) np.testing.assert_array_almost_equal(np_cov, tf_cov) logging.info('Tensorflow took %f', time.time() - start_time) tf_cov = sess.run(op_diag) np.testing.assert_array_almost_equal( np.diag(np_cov), np.ravel(tf_cov), decimal=5)
def nancov(a, b, min_periods=None): if len(a) != len(b): raise AssertionError('Operands to nancov must have same size') if min_periods is None: min_periods = 1 valid = notnull(a) & notnull(b) if not valid.all(): a = a[valid] b = b[valid] if len(a) < min_periods: return np.nan return np.cov(a, b)[0, 1]
def test_flex_binary_frame(self): def _check(method): series = self.frame[1] res = getattr(series.rolling(window=10), method)(self.frame) res2 = getattr(self.frame.rolling(window=10), method)(series) exp = self.frame.apply(lambda x: getattr( series.rolling(window=10), method)(x)) tm.assert_frame_equal(res, exp) tm.assert_frame_equal(res2, exp) frame2 = self.frame.copy() frame2.values[:] = np.random.randn(*frame2.shape) res3 = getattr(self.frame.rolling(window=10), method)(frame2) exp = DataFrame(dict((k, getattr(self.frame[k].rolling( window=10), method)(frame2[k])) for k in self.frame)) tm.assert_frame_equal(res3, exp) methods = ['corr', 'cov'] for meth in methods: _check(meth)
def test_expanding_cov_diff_index(self): # GH 7512 s1 = Series([1, 2, 3], index=[0, 1, 2]) s2 = Series([1, 3], index=[0, 2]) result = s1.expanding().cov(s2) expected = Series([None, None, 2.0]) assert_series_equal(result, expected) s2a = Series([1, None, 3], index=[0, 1, 2]) result = s1.expanding().cov(s2a) assert_series_equal(result, expected) s1 = Series([7, 8, 10], index=[0, 1, 3]) s2 = Series([7, 9, 10], index=[0, 2, 3]) result = s1.expanding().cov(s2) expected = Series([None, None, None, 4.5]) assert_series_equal(result, expected)
def test_expanding_cov_pairwise_diff_length(self): # GH 7512 df1 = DataFrame([[1, 5], [3, 2], [3, 9]], columns=['A', 'B']) df1a = DataFrame([[1, 5], [3, 9]], index=[0, 2], columns=['A', 'B']) df2 = DataFrame([[5, 6], [None, None], [2, 1]], columns=['X', 'Y']) df2a = DataFrame([[5, 6], [2, 1]], index=[0, 2], columns=['X', 'Y']) result1 = df1.expanding().cov(df2a, pairwise=True)[2] result2 = df1.expanding().cov(df2a, pairwise=True)[2] result3 = df1a.expanding().cov(df2, pairwise=True)[2] result4 = df1a.expanding().cov(df2a, pairwise=True)[2] expected = DataFrame([[-3., -5.], [-6., -10.]], index=['A', 'B'], columns=['X', 'Y']) assert_frame_equal(result1, expected) assert_frame_equal(result2, expected) assert_frame_equal(result3, expected) assert_frame_equal(result4, expected)
def covariance_matrices(im, labels, return_mm3=True): """ Considers the label as a point distribution in the space, and returns the covariance matrix of the points distributions. :param im: input nibabel image :param labels: list of labels input. :param return_mm3: if true the answer is in mm if false in voxel indexes. :return: covariance matrix of the point distribution of the label """ cov_matrices = [np.zeros([3, 3])] * len(labels) for l_id, l in enumerate(labels): coords = np.where(im.get_data() == l) # returns [X_vector, Y_vector, Z_vector] if np.count_nonzero(coords) > 0: cov_matrices[l_id] = np.cov(coords) else: cov_matrices[l_id] = np.nan * np.ones([3, 3]) if return_mm3: cov_matrices = [im.affine[:3, :3].dot(cm.astype(np.float64)) for cm in cov_matrices] return cov_matrices
def correlation(task,load=True): self = mytask if load: self.initialize(_load=True, _logging=False, _log_dir='other/') data = [] for batch in self.iterate_minibatches('valid'): xtrain, ytrain = batch ytrain = np.eye(10)[ytrain] feed_dict = {self.x: xtrain, self.y: ytrain, self.sigma0: 1., self.initial_keep_prob: task['initial_keep_prob'], self.is_training: False} z = tf.get_collection('log_network')[-1] batch_z = self.sess.run( z, feed_dict) data.append(batch_z) data = np.vstack(data) data = data.reshape(data.shape[0],-1) def normal_tc(c0): c1i = np.diag(1./np.diag(c0)) p = np.matmul(c1i,c0) return - .5 * np.linalg.slogdet(p)[1] / c0.shape[0] c0 = np.cov( data, rowvar=False ) tc = normal_tc(c0) print "Total correlation: %f" % tc