我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用scipy.linalg.cho_solve()。
def variance_values(self, beta): """ Covariance matrix for the estimated function Parameters ---------- beta : np.array Contains untransformed starting values for latent variables Returns ---------- Covariance matrix for the estimated function """ parm = np.array([self.latent_variables.z_list[k].prior.transform(beta[k]) for k in range(beta.shape[0])]) L = self._L(parm) v = la.cho_solve((L, True), self.kernel.K(parm)) return self.kernel.K(parm) - np.dot(v.T, v)
def compute_gradient_totalcverr_wrt_lambda(self,matrix_results,lambda_val,sigmasq_z): # 0: K_tst_tr; 1: K_tr_tr; 2: D_tst_tr; 3: D_tr_tr num_sample_cv = self.num_samples ttl_num_folds = np.shape(matrix_results)[1] gradient_cverr_per_fold = np.zeros(ttl_num_folds) for jj in range(ttl_num_folds): uu = np.shape(matrix_results[3][jj])[0] # number of training samples M_tst_tr = exp(matrix_results[2][jj]*float(-1/2)*sigmasq_z**(-1)) M_tr_tr = exp(matrix_results[3][jj]*float(-1/2)*sigmasq_z**(-1)) lower_ZZ = cholesky(M_tr_tr+ lambda_val*eye(uu), lower=True) ZZ = cho_solve((lower_ZZ,True),eye(uu)) first_term = matrix_results[0][jj].dot(ZZ.dot(ZZ.dot(M_tst_tr.T))) second_term = M_tst_tr.dot(ZZ.dot(ZZ.dot( matrix_results[1][jj].dot(ZZ.dot(M_tst_tr.T))))) gradient_cverr_per_fold[jj] = trace(first_term-second_term) return 2*sum(gradient_cverr_per_fold)/float(num_sample_cv) # lambda = exp(eta)
def compute_gradient_totalcverr_wrt_sqsigma(self,matrix_results,lambda_val,sigmasq_z): # 0: K_tst_tr; 1: K_tr_tr; 2: D_tst_tr; 3: D_tr_tr num_sample_cv = self.num_samples ttl_num_folds = np.shape(matrix_results)[1] gradient_cverr_per_fold = np.zeros(ttl_num_folds) for jj in range(ttl_num_folds): uu = np.shape(matrix_results[3][jj])[0] log_M_tr_tst = matrix_results[2][jj].T*float(-1/2)*sigmasq_z**(-1) M_tr_tst = exp(log_M_tr_tst) log_M_tr_tr = matrix_results[3][jj]*float(-1/2)*sigmasq_z**(-1) M_tr_tr = exp(log_M_tr_tr) lower_ZZ = cholesky(M_tr_tr+ lambda_val*eye(uu), lower=True) ZZ = cho_solve((lower_ZZ,True),eye(uu)) term_1 = matrix_results[0][jj].dot(ZZ.dot((M_tr_tr*sigmasq_z**(-1)*(-log_M_tr_tr)).dot(ZZ.dot(M_tr_tst)))) term_2 = -matrix_results[0][jj].dot(ZZ.dot(M_tr_tst*(-log_M_tr_tst*sigmasq_z**(-1)))) term_3 = (sigmasq_z**(-1)*(M_tr_tst.T)*(-log_M_tr_tst.T)).dot(ZZ.dot(matrix_results[1][jj].dot(ZZ.dot(M_tr_tst)))) term_4 = -(M_tr_tst.T).dot(ZZ.dot((M_tr_tr*sigmasq_z**(-1)*(-log_M_tr_tr)).dot(ZZ.dot(matrix_results[1][jj].dot( ZZ.dot(M_tr_tst)))))) term_5 = -(M_tr_tst.T).dot(ZZ.dot(matrix_results[1][jj].dot(ZZ.dot((M_tr_tr*sigmasq_z**(-1)*(-log_M_tr_tr)).dot( ZZ.dot(M_tr_tst)))))) term_6 = (M_tr_tst.T).dot(ZZ.dot(matrix_results[1][jj].dot(ZZ.dot(M_tr_tst*sigmasq_z**(-1)*(-log_M_tr_tst))))) gradient_cverr_per_fold[jj] = trace(2*term_1 + 2*term_2 + term_3 + term_4 + term_5 + term_6) return sum(gradient_cverr_per_fold)/float(num_sample_cv)
def compute_totalcverr(self,matrix_results,lambda_val,sigmasq_z): # 0: K_tst_tr; 1: K_tr_tr; 2: K_tst_tst; 3: D_tst_tr; 4: D_tr_tr num_sample_cv = self.num_samples ttl_num_folds = np.shape(matrix_results)[1] cverr_per_fold = np.zeros(ttl_num_folds) for jj in range(ttl_num_folds): uu = np.shape(matrix_results[4][jj])[0] # number of training samples M_tst_tr = exp(matrix_results[3][jj]*float(-1/2)*sigmasq_z**(-1)) M_tr_tr = exp(matrix_results[4][jj]*float(-1/2)*sigmasq_z**(-1)) lower_ZZ = cholesky(M_tr_tr+ lambda_val*eye(uu), lower=True) ZZ = cho_solve((lower_ZZ,True),eye(uu)) first_term = matrix_results[2][jj] second_term = - matrix_results[0][jj].dot(ZZ.dot(M_tst_tr.T)) third_term = np.transpose(second_term) fourth_term = M_tst_tr.dot(ZZ.dot( matrix_results[1][jj].dot(ZZ.dot(M_tst_tr.T)))) cverr_per_fold[jj] = trace(first_term + second_term + third_term + fourth_term) return sum(cverr_per_fold)/float(num_sample_cv)
def _kalman_correct(x, P, z, H, R, gain_factor, gain_curve): PHT = np.dot(P, H.T) S = np.dot(H, PHT) + R e = z - H.dot(x) L = cholesky(S, lower=True) inn = solve_triangular(L, e, lower=True) if gain_curve is not None: q = (np.dot(inn, inn) / inn.shape[0]) ** 0.5 f = gain_curve(q) if f == 0: return inn L *= (q / f) ** 0.5 K = cho_solve((L, True), PHT.T, overwrite_b=True).T if gain_factor is not None: K *= gain_factor[:, None] U = -K.dot(H) U[np.diag_indices_from(U)] += 1 x += K.dot(z - H.dot(x)) P[:] = U.dot(P).dot(U.T) + K.dot(R).dot(K.T) return inn
def project_onto_nullspace(mom, cache): """ Projects a momentum on to the nullspace of the constraint Jacobian. Parameters ---------- mom : vector Momentum state to project. cache : dictionary Dictionary of cached constraint Jacobian results. Returns ------- mom : vector Momentum state projected into nullspace of constraint Jacobian. """ dc_dpos = cache['dc_dpos'] if 'gram_chol' not in cache: cache['gram_chol'] = la.cho_factor(dc_dpos.dot(dc_dpos.T)) gram_chol = cache['gram_chol'] dc_dpos_mom = dc_dpos.dot(mom) gram_inv_dc_dpos_mom = la.cho_solve(gram_chol, dc_dpos_mom) dc_dpos_pinv_dc_dpos_mom = dc_dpos.T.dot(gram_inv_dc_dpos_mom) mom -= dc_dpos_pinv_dc_dpos_mom return mom
def test_kinetic_energy(self): mom_resample_coeff = 1. dtype = np.float64 for n_dim in [10, 100, 1000]: mass_matrix = self.prng.normal(size=(n_dim, n_dim)) mass_matrix = mass_matrix.dot(mass_matrix.T) mass_matrix_chol = la.cholesky(mass_matrix, lower=True) sampler = uhmc.EuclideanMetricHmcSampler( energy_func=energy_func, mass_matrix=mass_matrix, energy_grad=energy_grad, prng=self.prng, mom_resample_coeff=mom_resample_coeff, dtype=dtype) pos, mom = self.prng.normal(size=(2, n_dim,)).astype(dtype) k_energy = sampler.kinetic_energy(pos, mom, {}) assert np.isscalar(k_energy), ( 'kinetic_energy returning non-scalar value.' ) assert np.allclose( k_energy, 0.5 * mom.dot(la.cho_solve((mass_matrix_chol, True), mom))), ( 'kinetic_energy returning incorrect value.' )
def nci(x, m, P): # dimension of state, # time steps, # MC simulations, # inference algorithms (filters/smoothers) d, time, mc_sims, algs = m.shape dx = x[..., na] - m # Mean Square Error matrix MSE = np.empty((d, d, time, mc_sims, algs)) for k in range(time): for s in range(mc_sims): for alg in range(algs): MSE[..., k, s, alg] = np.outer(dx[..., k, s, alg], dx[..., k, s, alg]) MSE = MSE.mean(axis=3) # average over MC simulations # dx_iP_dx = np.empty((1, time, mc_sims, algs)) NCI = np.empty((1, time, mc_sims, algs)) for k in range(1, time): for s in range(mc_sims): for alg in range(algs): # iP_dx = cho_solve(cho_factor(P[:, :, k, s, alg]), dx[:, k, s, alg]) # dx_iP_dx[:, k, s, alg] = dx[:, k, s, alg].dot(iP_dx) # iMSE_dx = cho_solve(cho_factor(MSE[..., k, fi]), dx[:, k, s, alg]) # NCI[..., k, s, fi] = 10*np.log10(dx_iP_dx[:, k, s, fi]) - 10*np.log10(dx[:, k, s, alg].dot(iMSE_dx)) dx_iP_dx = dx[:, k, s, alg].dot(np.linalg.inv(P[..., k, s, alg])).dot(dx[:, k, s, alg]) dx_iMSE_dx = dx[:, k, s, alg].dot(np.linalg.inv(MSE[..., k, alg])).dot(dx[:, k, s, alg]) NCI[..., k, s, alg] = 10 * np.log10(dx_iP_dx) - 10 * np.log10(dx_iMSE_dx) return NCI[:, 1:, ...].mean(axis=1) # average over time steps (ignore the 1st)
def grad_input(self, x): """Compute the gradient of the mean function and the standard deviation function at the provided input. """ # Compute the gradient of the mean function. d_kernel = self.kernel.grad_input(x, self.X) d_mean = d_kernel.T.dot(self.alpha) # Compute the gradient of the standard deviation function. It is # absolutely crucial to note that the predict method returns the # variance, not the standard deviation, of the prediction. sd = self.predict(x)[1] K_cross = self.kernel.cov(x, self.X) M = spla.cho_solve((self.L, True), K_cross.T).ravel() d_sd = -d_kernel.T.dot(M) / sd return d_mean, d_sd
def fit(self, X, y): """Fit the Bayesian linear regression model leveraging the available data. Parameters: X (numpy array): A two-dimensional numpy array representing the matrix of covariates. Note that if a bias term is expressly desired, it must be included in the design matrix. y (numpy array): A matrix of target variables to predict. """ P = X.T.dot(X) + self.prior_prec L = spla.cholesky(P, lower=True) self.mu = spla.cho_solve((L, True), X.T.dot(y)) self.sigma_sq = np.mean((X.dot(self.mu) - y) ** 2) L_inv = spla.solve_triangular(L.T, np.eye(L.shape[0])) self.cov = self.sigma_sq * L_inv.dot(L_inv.T)
def chol_inv(B, lower=True): """ Returns the inverse of matrix A, where A = B*B.T, ie B is the Cholesky decomposition of A. Solves Ax = I given B is the cholesky factorization of A. """ return cho_solve((B, lower), np.eye(B.shape[0]))
def kalman_filter(y, H, R, F, Q, mu, PI, z=None): """ Given the following sequences (one item of given dimension for each time step): - *y*: measurements (M) - *H*: measurement operator (MxN) - *R*: measurement noise covariance (MxM) - *F*: time update operator (NxN) - *Q*: time update noise covariance (NxN) - *mu*: initial state (N) - *PI*: initial state covariance (NxN) - *z*: (optional) systematic time update input (N) Return the :class:`FilterResult` containing lists of posterior state estimates and error covariances. """ x_hat = [] P = [] x_hat_prior = mu P_prior = PI if z is None: z = repeat(None) for i, (y_i, H_i, R_i, F_i, Q_i, z_i) in enumerate(izip(y, H, R, F, Q, z)): # measurement update A = cho_factor(NP.matmul(H_i, NP.matmul(P_prior, H_i.T)) + R_i) B = cho_solve(A, NP.matmul(H_i, P_prior)) b = cho_solve(A, y_i - NP.matmul(H_i, x_hat_prior)) C = NP.matmul(P_prior, H_i.T) x_hat.append(x_hat_prior + NP.matmul(C, b)) P.append(P_prior - NP.matmul(C, B)) # time update x_hat_prior = NP.matmul(F_i, x_hat[-1]) if z_i is not None: x_hat_prior += z_i P_prior = NP.matmul(F_i, NP.matmul(P[-1], F_i.T)) + Q_i return FilterResult(x_hat, P)
def KLdiv(mu0, Lcov0, mu1, Lcov1): """Numpy KL calculation.""" tr, dist, ldet = 0., 0., 0. D, n = mu0.shape for m0, m1, L0, L1 in zip(mu0, mu1, Lcov0, Lcov1): tr += np.trace(cho_solve((L1, True), L0.dot(L0.T))) md = m1 - m0 dist += md.dot(cho_solve((L1, True), md)) ldet += logdet(L1) - logdet(L0) KL = 0.5 * (tr + dist + ldet - D * n) return KL
def _alpha(self, L): """ Covariance-derived term to construct expectations. See Rasmussen & Williams. Parameters ---------- L : np.ndarray Cholesky triangular Returns ---------- np.ndarray (alpha) """ return la.cho_solve((L.T, True), la.cho_solve((L, True), np.transpose(self.data)))
def chol2inv(chol): return spla.cho_solve((chol, False), np.eye(chol.shape[ 0 ]))
def chol2inv(chol): return spla.cho_solve((chol, False), np.eye(chol.shape[0]))
def __init__(self,gamma,beta,nugget,kernelName,k_lambda,xTrain,yTrain): """ Create a new GaussianProcess Object gamma: Hyperparameter beta: Hyperparameter k_lambda: Hyperparameter nugget: The noise hyperparameter kernelName: The name of the covariance kernel xTrain: Numpy array containing x training values yTrain: Numpy array containing y training values """ self.xTrain = xTrain self.yTrain = yTrain self.k_lambda = k_lambda self.beta = beta self.gamma = gamma self.nugget = nugget self.kernelName = kernelName # Setup the regressor as if gp.fit had been called # See https://github.com/scikit-learn/scikit-learn/master/sklearn/gaussian_process/gpr.py kernel = self._getKernel() gp = GaussianProcessRegressor(kernel=kernel, n_restarts_optimizer=0) gp.K = kernel(xTrain); gp.X_train_ = xTrain; gp.y_train_ = yTrain; gp.L_ = cholesky(gp.K, lower=True) gp.alpha_ = cho_solve((gp.L_, True), yTrain) gp.fit(xTrain,yTrain) gp.kernel_ = kernel self.gp = gp self.kernel = kernel # Calculate the matrix inverses once. Save time later # This is only used for own own implimentation of the scoring engine self.L_inv = solve_triangular(self.L_.T, np.eye(self.L_.shape[0])) self.K_inv = L_inv.dot(L_inv.T)
def solve(self, other): if other.ndim == 1: Nx = np.array(other / self.N) elif other.ndim == 2: Nx = np.array(other / self.N[:,None]) UNx = np.dot(self.U.T, Nx) Sigma = np.diag(1/self.J) + np.dot(self.U.T, self.U/self.N[:,None]) cf = sl.cho_factor(Sigma) if UNx.ndim == 1: tmp = np.dot(self.U, sl.cho_solve(cf, UNx)) / self.N else: tmp = np.dot(self.U, sl.cho_solve(cf, UNx)) / self.N[:,None] return Nx - tmp
def __call__(self, other): return sl.cho_solve(self.cf, other)
def inv(self): return sl.cho_solve(self.cf, np.eye(len(self.cf[0])))
def _solve_ZNX(self, X, Z): """Solves :math:`Z^T N^{-1}X`, where :math:`X` and :math:`Z` are 1-d or 2-d arrays. """ if X.ndim == 1: X = X.reshape(X.shape[0], 1) if Z.ndim == 1: Z = Z.reshape(Z.shape[0], 1) n, m = Z.shape[1], X.shape[1] ZNX = np.zeros((n, m)) if len(self._idx) > 0: ZNXr = np.dot(Z[self._idx,:].T, X[self._idx,:] / self._nvec[self._idx, None]) else: ZNXr = 0 for slc, block in zip(self._slices, self._blocks): Zblock = Z[slc, :] Xblock = X[slc, :] if slc.stop - slc.start > 1: cf = sl.cho_factor(block+np.diag(self._nvec[slc])) bx = sl.cho_solve(cf, Xblock) else: bx = Xblock / self._nvec[slc][:, None] ZNX += np.dot(Zblock.T, bx) ZNX += ZNXr return ZNX.squeeze() if len(ZNX) > 1 else float(ZNX)
def _solve_NX(self, X): """Solves :math:`N^{-1}X`, where :math:`X` is a 1-d or 2-d array. """ if X.ndim == 1: X = X.reshape(X.shape[0], 1) NX = X / self._nvec[:,None] for slc, block in zip(self._slices, self._blocks): Xblock = X[slc, :] if slc.stop - slc.start > 1: cf = sl.cho_factor(block+np.diag(self._nvec[slc])) NX[slc] = sl.cho_solve(cf, Xblock) return NX.squeeze()
def solve(self, Y): return la.cho_solve(self._cholesky, Y)
def compute_gradient_totalcverr_wrt_eta(self,matrix_results,lambda_val,sigmasq_z): # 0: K_tst_tr; 1: K_tr_tr; 2: D_tst_tr; 3: D_tr_tr #eta = log(lambda_val) #gamma = log(sigmasq_z) num_sample_cv = self.num_samples ttl_num_folds = np.shape(matrix_results)[1] gradient_cverr_per_fold = np.zeros(ttl_num_folds) for jj in range(ttl_num_folds): uu = np.shape(matrix_results[3][jj])[0] # number of training samples M_tst_tr = exp(matrix_results[2][jj]*float(-1/2)*sigmasq_z**(-1)) M_tr_tr = exp(matrix_results[3][jj]*float(-1/2)*sigmasq_z**(-1)) lower_ZZ = cholesky(M_tr_tr+ lambda_val*eye(uu), lower=True) ZZ = cho_solve((lower_ZZ,True),eye(uu)) EE = lambda_val*eye(uu) first_term = matrix_results[0][jj].dot(ZZ.dot(EE.dot(ZZ.dot(M_tst_tr.T)))) second_term = first_term.T third_term = -M_tst_tr.dot(ZZ.dot(EE.dot(ZZ.dot( matrix_results[1][jj].dot(ZZ.dot(M_tst_tr.T)))))) forth_term = -M_tst_tr.dot(ZZ.dot( matrix_results[1][jj].dot(ZZ.dot(EE.dot(ZZ.dot(M_tst_tr.T)))))) gradient_cverr_per_fold[jj] = trace(first_term + second_term + third_term + forth_term) return sum(gradient_cverr_per_fold)/float(num_sample_cv) # compute the gradient of the total cverror with respect to sigma_z squared
def compute_gradient_totalcverr_wrt_gamma(self,matrix_results,lambda_val,sigmasq_z): # 0: K_tst_tr; 1: K_tr_tr; 2: D_tst_tr; 3: D_tr_tr #eta = log(lambda_val) #gamma = log(sigmasq_z) num_sample_cv = self.num_samples ttl_num_folds = np.shape(matrix_results)[1] gradient_cverr_per_fold = np.zeros(ttl_num_folds) for jj in range(ttl_num_folds): uu = np.shape(matrix_results[3][jj])[0] log_M_tr_tst = matrix_results[2][jj].T*float(-1/2)*sigmasq_z**(-1) M_tr_tst = exp(log_M_tr_tst) log_M_tr_tr = matrix_results[3][jj]*float(-1/2)*sigmasq_z**(-1) M_tr_tr = exp(log_M_tr_tr) lower_ZZ = cholesky(M_tr_tr+ lambda_val*eye(uu), lower=True) ZZ = cho_solve((lower_ZZ,True),eye(uu)) term_1 = matrix_results[0][jj].dot(ZZ.dot((M_tr_tr*(-log_M_tr_tr)).dot(ZZ.dot(M_tr_tst)))) term_2 = -matrix_results[0][jj].dot(ZZ.dot(M_tr_tst*(-log_M_tr_tst))) term_3 = (M_tr_tst.T*(-log_M_tr_tst).T).dot(ZZ.dot(matrix_results[1][jj].dot(ZZ.dot(M_tr_tst)))) term_4 = -(M_tr_tst.T).dot(ZZ.dot((M_tr_tr*(-log_M_tr_tr)).dot(ZZ.dot(matrix_results[1][jj].dot( ZZ.dot(M_tr_tst)))))) term_5 = -(M_tr_tst.T).dot(ZZ.dot(matrix_results[1][jj].dot(ZZ.dot((M_tr_tr*(-log_M_tr_tr)).dot( ZZ.dot(M_tr_tst)))))) term_6 = (M_tr_tst.T).dot(ZZ.dot(matrix_results[1][jj].dot(ZZ.dot(M_tr_tst*(-log_M_tr_tst))))) gradient_cverr_per_fold[jj] = trace(2*term_1 + 2*term_2 + term_3 + term_4 + term_5 + term_6) return sum(gradient_cverr_per_fold)/float(num_sample_cv) # compute the total cverror
def _rts_pass(x, P, xa, Pa, Phi): n_points, n_states = x.shape I = np.identity(n_states) for i in reversed(range(n_points - 1)): L = cholesky(Pa[i + 1], check_finite=False) Pa_inv = cho_solve((L, False), I, check_finite=False) C = P[i].dot(Phi[i].T).dot(Pa_inv) x[i] += C.dot(x[i + 1] - xa[i + 1]) P[i] += C.dot(P[i + 1] - Pa[i + 1]).dot(C.T) return x, P
def kinetic_energy(self, pos, mom, cache={}): return 0.5 * mom.dot(la.cho_solve( (self.mass_matrix_chol, True), mom))
def simulate_dynamic(self, n_step, dt, pos, mom, cache={}): mom = mom - 0.5 * dt * self.energy_grad(pos, cache) pos = pos + dt * la.cho_solve((self.mass_matrix_chol, True), mom) for s in range(1, n_step): mom -= dt * self.energy_grad(pos, cache) pos += dt * la.cho_solve((self.mass_matrix_chol, True), mom) mom -= 0.5 * dt * self.energy_grad(pos, cache) return pos, mom, None
def _measurement_update(self, y): gain = cho_solve(cho_factor(self.z_cov_pred), self.xz_cov).T self.x_mean_filt = self.x_mean_pred + gain.dot(y - self.z_mean_pred) self.x_cov_filt = self.x_cov_pred - gain.dot(self.z_cov_pred).dot(gain.T)
def _smoothing_update(self): gain = cho_solve(cho_factor(self.x_cov_pred), self.xx_cov).T self.x_mean_smooth = self.x_mean_filt + gain.dot(self.x_mean_smooth - self.x_mean_pred) self.x_cov_smooth = self.x_cov_filt + gain.dot(self.x_cov_smooth - self.x_cov_pred).dot(gain.T)
def weights_rbf(self, unit_sp, hypers): # BQ weights for RBF kernel with given hypers, computations adopted from the GP-ADF code [Deisenroth] with # the following assumptions: # (A1) the uncertain input is zero-mean with unit covariance # (A2) one set of hyper-parameters is used for all output dimensions (one GP models all outputs) d, n = unit_sp.shape # GP kernel hyper-parameters alpha, el, jitter = hypers['sig_var'], hypers['lengthscale'], hypers['noise_var'] assert len(el) == d # pre-allocation for convenience eye_d, eye_n = np.eye(d), np.eye(n) iLam1 = np.atleast_2d(np.diag(el ** -1)) # sqrt(Lambda^-1) iLam2 = np.atleast_2d(np.diag(el ** -2)) inp = unit_sp.T.dot(iLam1) # sigmas / el[:, na] (x - m)^T*sqrt(Lambda^-1) # (numSP, xdim) K = np.exp(2 * np.log(alpha) - 0.5 * maha(inp, inp)) iK = cho_solve(cho_factor(K + jitter * eye_n), eye_n) B = iLam2 + eye_d # (D, D) c = alpha ** 2 / np.sqrt(det(B)) t = inp.dot(inv(B)) # inn*(P + Lambda)^-1 l = np.exp(-0.5 * np.sum(inp * t, 1)) # (N, 1) zet = 2 * np.log(alpha) - 0.5 * np.sum(inp * inp, 1) inp = inp.dot(iLam1) R = 2 * iLam2 + eye_d t = 1 / np.sqrt(det(R)) L = np.exp((zet[:, na] + zet[:, na].T) + maha(inp, -inp, V=0.5 * inv(R))) q = c * l # evaluations of the kernel mean map (from the viewpoint of RHKS methods) # mean weights wm_f = q.dot(iK) iKQ = iK.dot(t * L) # covariance weights wc_f = iKQ.dot(iK) # cross-covariance "weights" wc_fx = np.diag(q).dot(iK) # used for self.D.dot(x - mean).dot(wc_fx).dot(fx) self.D = inv(eye_d + np.diag(el ** 2)) # S(S+Lam)^-1; for S=I, (I+Lam)^-1 # model variance; to be added to the covariance # this diagonal form assumes independent GP outputs (cov(f^a, f^b) = 0 for all a, b: a neq b) self.model_var = np.diag((alpha ** 2 - np.trace(iKQ)) * np.ones((d, 1))) return wm_f, wc_f, wc_fx
def plot_gp_model(self, f, unit_sp, args, test_range=(-5, 5, 50), plot_dims=(0, 0)): # plot out_dim vs. in_dim in_dim, out_dim = plot_dims # test input must have the same dimension as specified in kernel test = np.linspace(*test_range) test_pts = np.zeros((self.d, len(test))) test_pts[in_dim, :] = test # function value observations at training points (unit sigma-points) y = np.apply_along_axis(f, 0, unit_sp, args) fx = np.apply_along_axis(f, 0, test_pts, args) # function values at test points K = self.kern.K(unit_sp.T) # covariances between sigma-points k = self.kern.K(test_pts.T, unit_sp.T) # covariance between test inputs and sigma-points kxx = self.kern.Kdiag(test_pts.T) # prior predictive variance k_iK = cho_solve(cho_factor(K), k.T).T gp_mean = k_iK.dot(y[out_dim, :]) # GP mean gp_var = np.diag(np.diag(kxx) - k_iK.dot(k.T)) # GP predictive variance # plot the GP mean, predictive variance and the true function plt.figure() plt.plot(test, fx[out_dim, :], color='r', ls='--', lw=2, label='true') plt.plot(test, gp_mean, color='b', ls='-', lw=2, label='GP mean') plt.fill_between(test, gp_mean + 2 * np.sqrt(gp_var), gp_mean - 2 * np.sqrt(gp_var), color='b', alpha=0.25, label='GP variance') plt.plot(unit_sp[in_dim, :], y[out_dim, :], color='k', ls='', marker='o', ms=8, label='data') plt.legend() plt.show()
def _pull_from_cache_or_compute(self): if self.caching and len(self._cache_list) == self.num_states: chol = self._cache_list[self.state]['chol'] alpha = self._cache_list[self.state]['alpha'] else: chol = spla.cholesky(self.kernel.cov(self.inputs), lower=True) alpha = spla.cho_solve((chol, True), self.values - self.mean.value) return chol, alpha
def _prepare_cache(self): inputs_hash = hash(self.inputs.tostring()) for i in xrange(self.num_states): self.set_state(i) chol = spla.cholesky(self.kernel.cov(self.inputs), lower=True) alpha = spla.cho_solve((chol, True), self.values - self.mean.value) cache_dict = { 'chol' : chol, 'alpha' : alpha } self._cache_list.append(cache_dict)
def log_likelihood(self): """ GP Marginal likelihood Notes ----- This is called by the samplers when fitting the hyperparameters. """ cov = self.kernel.cov(self.observed_inputs) chol = spla.cholesky(cov, lower=True) solve = spla.cho_solve((chol, True), self.observed_values - self.mean.value) # Uses the identity that log det A = log prod diag chol A = sum log diag chol A return -np.sum(np.log(np.diag(chol)))-0.5*np.dot(self.observed_values - self.mean.value, solve)
def fit(self, X, y): """Fit the parameters of the probabilistic process based on the available training data. """ # Store the training data (both the inputs and the targets). self.X, self.y = X, y # Compute the covariance matrix of the observed inputs. K = self.kernel.cov(self.X) # For a numerically stable algorithm, we use Cholesky decomposition. self.L = spla.cholesky(K, lower=True) self.alpha = spla.cho_solve((self.L, True), self.y).ravel() L_inv = spla.solve_triangular(self.L.T, np.eye(self.L.shape[0])) self.K_inv = L_inv.dot(L_inv.T) self.beta = self.y.dot(self.alpha)
def _construct_predict(self, beta, h): """ Creates h-step ahead forecasts for the Gaussian process Parameters ---------- beta : np.array Contains untransformed starting values for the latent variables h: int How many steps ahead to forecast Returns ---------- - predictions - variance of predictions """ # Refactor this entire code in future parm = np.array([self.latent_variables.z_list[k].prior.transform(beta[k]) for k in range(beta.shape[0])]) Xstart = self.X().copy() Xstart = [i for i in Xstart] predictions = np.zeros(h) variances = np.zeros(h) for step in range(0,h): Xstar = [] for lag in range(0,self.max_lag): if lag == 0: if step == 0: Xstar.append([self.data[-1]]) Xstart[0] = np.append(Xstart[0],self.data[-1]) else: Xstar.append([predictions[step-1]]) Xstart[0] = np.append(Xstart[0],predictions[step-1]) else: Xstar.append([Xstart[lag-1][-2]]) Xstart[lag] = np.append(Xstart[lag],Xstart[lag-1][-2]) Kstar = self.kernel.Kstar(parm, np.transpose(np.array(Xstar))) L = self._L(parm) alpha = self._alpha(L) predictions[step] = np.dot(np.transpose(Kstar), alpha) v = la.cho_solve((L, True), Kstar) variances[step] = self.kernel.Kstarstar(parm, np.transpose(np.array(Xstar))) - np.dot(v.T, v) return predictions, variances, predictions - 1.98*np.power(variances,0.5), predictions + 1.98*np.power(variances,0.5)
def evaluate(self, theta_new, t=None): """Evaluate the acquisition function at the location theta_new. Parameters ---------- theta_new : array_like Evaluation coordinates. t : int, optional Current iteration, (unused). Returns ------- array_like Expected loss's term dependent on theta_new. """ gp = self.model n_imp, n_dim = self.points_int.shape # Alter the shape of theta_new. if n_dim != 1 and theta_new.ndim == 1: theta_new = theta_new[np.newaxis, :] elif n_dim == 1 and theta_new.ndim == 1: theta_new = theta_new[:, np.newaxis] # Calculate the integrand term w. # Note: w's second term (given in Järvenpää et al., 2017) is dismissed # because it is constant with respect to theta_new. _, var_new = gp.predict(theta_new, noiseless=True) k_old_new = self._K(self.thetas_old, theta_new) k_int_new = self._K(self.points_int, theta_new).T # Using the Cholesky factorisation to avoid computing matrix inverse. term_chol = sl.cho_solve(sl.cho_factor(self.K), k_old_new) cov_int = k_int_new - np.dot(self.k_int_old.T, term_chol).T delta_var_int = cov_int**2 / (self.sigma2_n + var_new) a = np.sqrt((self.sigma2_n + self.var_int.T - delta_var_int) / (self.sigma2_n + self.var_int.T + delta_var_int)) # Using the skewnorm's cdf to substitute the Owen's T function. phi_skew_imp = ss.skewnorm.cdf(self.eps, a, loc=self.mean_int.T, scale=np.sqrt(self.sigma2_n + self.var_int.T)) w = ((self.phi_int - phi_skew_imp) / 2) loss_theta_new = 2 * np.sum(self.omegas_int * self.priors_int * w, axis=1) return loss_theta_new
def __call__(self, xs, phiinv_method='partition'): # map parameter vector if needed params = xs if isinstance(xs,dict) else self.pta.map_params(xs) # phiinvs will be a list or may be a big matrix if spatially # correlated signals TNrs = self.pta.get_TNr(params) TNTs = self.pta.get_TNT(params) phiinvs = self.pta.get_phiinv(params, logdet=True, method=phiinv_method) # get -0.5 * (rNr + logdet_N) piece of likelihood loglike = -0.5 * np.sum([l for l in self.pta.get_rNr_logdet(params)]) # red noise piece if self.pta._commonsignals: phiinv, logdet_phi = phiinvs Sigma = self._make_sigma(TNTs, phiinv) TNr = np.concatenate(TNrs) cf = cholesky(Sigma) expval = cf(TNr) logdet_sigma = cf.logdet() loglike += 0.5*(np.dot(TNr, expval) - logdet_sigma - logdet_phi) else: for TNr, TNT, (phiinv, logdet_phi) in zip(TNrs, TNTs, phiinvs): Sigma = TNT + (np.diag(phiinv) if phiinv.ndim == 1 else phiinv) try: cf = sl.cho_factor(Sigma) expval = sl.cho_solve(cf, TNr) except: return -np.inf logdet_sigma = np.sum(2 * np.log(np.diag(cf[0]))) loglike += 0.5*(np.dot(TNr, expval) - logdet_sigma - logdet_phi) return loglike
def get_phiinv_byfreq_cliques(self, params, logdet=False, cholesky=False): phi = self.get_phi(params, cliques=True) if isinstance(phi, list): return [None if phivec is None else phivec.inv(logdet) for phivec in phi] else: ld = 0 # first invert all the cliques for clcount in range(self._clcount): idx = (self._cliques == clcount) if np.any(idx): idx2 = np.ix_(idx,idx) if cholesky: cf = sl.cho_factor(phi[idx2]) if logdet: ld += 2.0*np.sum(np.log(np.diag(cf[0]))) phi[idx2] = sl.cho_solve( cf, np.identity(cf[0].shape[0])) else: phi2 = phi[idx2] if logdet: ld += np.linalg.slogdet(phi2)[1] phi[idx2] = np.linalg.inv(phi2) # then do the pure diagonal terms idx = (self._cliques == -1) if logdet: ld += np.sum(np.log(phi[idx,idx])) phi[idx,idx] = 1.0/phi[idx,idx] return (phi, ld) if logdet else phi # we use "cliques" to account for sparse non-diagonal Phi matrices # for each value in self._cliques, the matrix indices with that value form # an independent submatrix that can be inverted separately # reset clique index
def normal_eq_comb(AtA, AtB, PassSet=None): """ Solve many systems of linear equations using combinatorial grouping. M. H. Van Benthem and M. R. Keenan, J. Chemometrics 2004; 18: 441-450 Parameters ---------- AtA : numpy.array, shape (n,n) AtB : numpy.array, shape (n,k) Returns ------- (Z,num_cholesky,num_eq) Z : numpy.array, shape (n,k) - solution num_cholesky : int - the number of unique cholesky decompositions done num_eq: int - the number of systems of linear equations solved """ num_cholesky = 0 num_eq = 0 if AtB.size == 0: Z = np.zeros([]) elif (PassSet is None) or np.all(PassSet): Z = nla.solve(AtA, AtB) num_cholesky = 1 num_eq = AtB.shape[1] else: Z = np.zeros(AtB.shape) if PassSet.shape[1] == 1: if np.any(PassSet): cols = PassSet.nonzero()[0] Z[cols] = nla.solve(AtA[np.ix_(cols, cols)], AtB[cols]) num_cholesky = 1 num_eq = 1 else: # # Both _column_group_loop() and _column_group_recursive() work well. # Based on preliminary testing, # _column_group_loop() is slightly faster for tiny k(<10), but # _column_group_recursive() is faster for large k's. # grps = _column_group_recursive(PassSet) for gr in grps: cols = PassSet[:, gr[0]].nonzero()[0] if cols.size > 0: ix1 = np.ix_(cols, gr) ix2 = np.ix_(cols, cols) # # scipy.linalg.cho_solve can be used instead of numpy.linalg.solve. # For small n(<200), numpy.linalg.solve appears faster, whereas # for large n(>500), scipy.linalg.cho_solve appears faster. # Usage example of scipy.linalg.cho_solve: # Z[ix1] = sla.cho_solve(sla.cho_factor(AtA[ix2]),AtB[ix1]) # Z[ix1] = nla.solve(AtA[ix2], AtB[ix1]) num_cholesky += 1 num_eq += len(gr) num_eq += len(gr) return Z, num_cholesky, num_eq
def weights_rbf(self, unit_sp, hypers): d, n = unit_sp.shape # GP kernel hyper-parameters alpha, el, jitter = hypers['sig_var'], hypers['lengthscale'], hypers['noise_var'] assert len(el) == d # pre-allocation for convenience eye_d, eye_n, eye_y = np.eye(d), np.eye(n), np.eye(n + d * n) K = self.kern_eq_der(unit_sp, hypers) # evaluate kernel matrix BOTTLENECK iK = cho_solve(cho_factor(K + jitter * eye_y), eye_y) # invert kernel matrix BOTTLENECK Lam = np.diag(el ** 2) iLam = np.diag(el ** -1) # sqrt(Lambda^-1) iiLam = np.diag(el ** -2) # Lambda^-1 inn = iLam.dot(unit_sp) # (x-m)^T*iLam # (N, D) B = iiLam + eye_d # P*Lambda^-1+I, (P+Lam)^-1 = Lam^-1*(P*Lam^-1+I)^-1 # (D, D) cho_B = cho_factor(B) t = cho_solve(cho_B, inn) # dot(inn, inv(B)) # (x-m)^T*iLam*(P+Lambda)^-1 # (D, N) l = np.exp(-0.5 * np.sum(inn * t, 0)) # (N, 1) q = (alpha ** 2 / np.sqrt(det(B))) * l # (N, 1) Sig_q = cho_solve(cho_B, eye_d) # B^-1*I eta = Sig_q.dot(unit_sp) # (D,N) Sig_q*x mu_q = iiLam.dot(eta) # (D,N) r = q[na, :] * iiLam.dot(mu_q - unit_sp) # -t.dot(iLam) * q # (D, N) q_tilde = np.hstack((q.T, r.T.ravel())) # (1, N+N*D) # weights for mean wm = q_tilde.dot(iK) # quantities for cross-covariance "weights" iLamSig = iiLam.dot(Sig_q) # (D,D) r_tilde = (q[na, na, :] * iLamSig[..., na] + mu_q[na, ...] * r[:, na, :]).T.reshape(n * d, d).T # (D, N*D) R_tilde = np.hstack((q[na, :] * mu_q, r_tilde)) # (D, N+N*D) # input-output covariance (cross-covariance) "weights" Wcc = R_tilde.dot(iK) # (D, N+N*D) # quantities for covariance weights zet = 2 * np.log(alpha) - 0.5 * np.sum(inn * inn, 0) # (D,N) 2log(alpha) - 0.5*(x-m)^T*Lambda^-1*(x-m) inn = iiLam.dot(unit_sp) # inp / el[:, na]**2 R = 2 * iiLam + eye_d # 2P*Lambda^-1 + I # (N,N) Q = (1.0 / np.sqrt(det(R))) * np.exp((zet[:, na] + zet[:, na].T) + maha(inn.T, -inn.T, V=0.5 * solve(R, eye_d))) cho_LamSig = cho_factor(Lam + Sig_q) Sig_Q = cho_solve(cho_LamSig, Sig_q).dot(iiLam) # (D,D) Lambda^-1 (Lambda*(Lambda+Sig_q)^-1*Sig_q) Lambda^-1 eta_tilde = iiLam.dot(cho_solve(cho_LamSig, eta)) # Lambda^-1(Lambda+Sig_q)^-1*eta ETA = eta_tilde[..., na] + eta_tilde[:, na, :] # (D,N,N) pairwise sum of pre-multiplied eta's (D,N,N) # mu_Q = ETA + in_mean[:, na] # (D,N,N) xnmu = inn[..., na] - ETA # (D,N,N) x_n - mu^Q_nm # xmmu = sigmas[:, na, :] - mu_Q # x_m - mu^Q_nm E_dff = (-Q[na, ...] * xnmu).swapaxes(0, 1).reshape(d * n, n) # (D,D,N,N) (x_n - mu^Q_nm)(x_m - mu^Q_nm)^T + Sig_Q T = xnmu[:, na, ...] * xnmu.swapaxes(1, 2)[na, ...] + Sig_Q[..., na, na] E_dffd = (Q[na, na, ...] * T).swapaxes(0, 3).reshape(d * n, -1) # (N*D, N*D) Q_tilde = np.vstack((np.hstack((Q, E_dff.T)), np.hstack((E_dff, E_dffd)))) # (N+N*D, N+N*D) # weights for covariance iKQ = iK.dot(Q_tilde) Wc = iKQ.dot(iK) # model variance self.model_var = np.diag((alpha ** 2 - np.trace(iKQ)) * np.ones((d, 1))) return wm, Wc, Wcc
def predict(self, pred, full_cov=False, compute_grad=False): inputs = self.inputs values = self.values # Special case if there is no data yet (everything from the prior) if inputs is None: return self.predict_from_prior(pred, full_cov, compute_grad) if pred.shape[1] != self.num_dims: raise Exception("Dimensionality of inputs must match dimensionality given at init time.") # The primary covariances for prediction. cand_cross = self.noiseless_kernel.cross_cov(inputs, pred) chol, alpha = self._pull_from_cache_or_compute() # Solve the linear systems. # Note: if X = LL^T, cho_solve performs X\b whereas solve_triangular performs L\b beta = spla.solve_triangular(chol, cand_cross, lower=True) # Predict the marginal means at candidates. func_m = np.dot(cand_cross.T, alpha) + self.mean.value if full_cov: # Return the covariance matrix of the pred inputs, # rather than just the individual variances at each input cand_cov = self.noiseless_kernel.cov(pred) func_v = cand_cov - np.dot(beta.T, beta) else: cand_cov = self.noiseless_kernel.diag_cov(pred) func_v = cand_cov - np.sum(beta**2, axis=0) if not compute_grad: return func_m, func_v grad_cross = self.noiseless_kernel.cross_cov_grad_data(inputs, pred) grad_xp_m = np.tensordot(np.transpose(grad_cross, (1,2,0)), alpha, 1) # this should be faster than (and equivalent to) spla.cho_solve((chol, True),cand_cross)) gamma = spla.solve_triangular(chol.T, beta, lower=False) # Using sum and multiplication and summing instead of matrix multiplication # because I only want the diagonals of the gradient of the covariance matrix, not the whole thing grad_xp_v = -2.0*np.sum(gamma[:,:,np.newaxis] * grad_cross, axis=0) # Not very important -- just to make sure grad_xp_v.shape = grad_xp_m.shape if values.ndim > 1: grad_xp_v = grad_xp_v[:,:,np.newaxis] # In case this is a function over a 1D input, # return a numpy array rather than a float if np.ndim(grad_xp_m) == 0: grad_xp_m = np.array([grad_xp_m]) grad_xp_v = np.array([grad_xp_v]) return func_m, func_v, grad_xp_m, grad_xp_v
def _posterior_mode(self, K, return_temporaries=False): """Mode-finding for binary Laplace GPC and fixed kernel. This approximates the posterior of the latent function values for given inputs and target observations with a Gaussian approximation and uses Newton's iteration to find the mode of this approximation. """ # Based on Algorithm 3.1 of GPML # If warm_start are enabled, we reuse the last solution for the # posterior mode as initialization; otherwise, we initialize with 0 if self.warm_start and hasattr(self, "f_cached") \ and self.f_cached.shape == self.y_train_.shape: f = self.f_cached else: f = np.zeros_like(self.y_train_, dtype=np.float64) # Use Newton's iteration method to find mode of Laplace approximation log_marginal_likelihood = -np.inf for _ in range(self.max_iter_predict): # Line 4 pi = 1 / (1 + np.exp(-f)) W = pi * (1 - pi) # Line 5 W_sr = np.sqrt(W) W_sr_K = W_sr[:, np.newaxis] * K B = np.eye(W.shape[0]) + W_sr_K * W_sr L = cholesky(B, lower=True) # Line 6 b = W * f + (self.y_train_ - pi) # Line 7 a = b - W_sr * cho_solve((L, True), W_sr_K.dot(b)) # Line 8 f = K.dot(a) # Line 10: Compute log marginal likelihood in loop and use as # convergence criterion lml = -0.5 * a.T.dot(f) \ - np.log(1 + np.exp(-(self.y_train_ * 2 - 1) * f)).sum() \ - np.log(np.diag(L)).sum() # Check if we have converged (log marginal likelihood does # not decrease) # XXX: more complex convergence criterion if lml - log_marginal_likelihood < 1e-10: break log_marginal_likelihood = lml self.f_cached = f # Remember solution for later warm-starts if return_temporaries: return log_marginal_likelihood, (pi, W_sr, L, b, a) else: return log_marginal_likelihood