我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sklearn.utils.check_random_state()。
def test_regression(): # Check regression for various parameter settings. rng = check_random_state(0) X_train, X_test, y_train, y_test = train_test_split(boston.data[:50], boston.target[:50], random_state=rng) grid = ParameterGrid({"max_samples": [0.5, 1.0], "max_features": [0.5, 1.0], "bootstrap": [True, False], "bootstrap_features": [True, False]}) for base_estimator in [None, DummyRegressor(), DecisionTreeRegressor(), KNeighborsRegressor(), SVR()]: for params in grid: BaggingRegressor(base_estimator=base_estimator, random_state=rng, **params).fit(X_train, y_train).predict(X_test)
def make_3_circles(n_samples, random_state=1): random_state = check_random_state(random_state) X = np.ones((3 * n_samples, 3)) Y_plot = np.ones((3 * n_samples, 1)) X[:n_samples, :2], _ = make_circles(n_samples=n_samples, noise=0.05, factor=.01, random_state=random_state) X[:n_samples, 2] *= -1 Y_plot[:n_samples, 0] = 1 X[n_samples:2 * n_samples, :2], _ = make_circles(n_samples=n_samples, noise=0.05, factor=.01, random_state=random_state) X[n_samples:2 * n_samples, 2] = 0 Y_plot[n_samples:2 * n_samples, 0] = 2 X[2 * n_samples:, :2], _ = make_circles(n_samples=n_samples, noise=0.05, factor=.01, random_state=random_state) Y_plot[2 * n_samples:, 0] = 3 # shuffle examples idx = random_state.permutation(list(range(3 * n_samples))) X, Y_plot = X[idx, :], Y_plot[idx, :] # cut to actual size X, Y_plot = X[:n_samples, :], Y_plot[:n_samples, :] return X, Y_plot
def fit(self, graphs, y=None): rnd = check_random_state(self.random_state) n_samples = len(graphs) # get basis vectors if self.n_components > n_samples: n_components = n_samples else: n_components = self.n_components n_components = min(n_samples, n_components) inds = rnd.permutation(n_samples) basis_inds = inds[:n_components] basis = [] for ind in basis_inds: basis.append(graphs[ind]) basis_kernel = self.kernel(basis, basis, **self._get_kernel_params()) # sqrt of kernel matrix on basis vectors U, S, V = svd(basis_kernel) S = np.maximum(S, 1e-12) self.normalization_ = np.dot(U * 1. / np.sqrt(S), V) self.components_ = basis self.component_indices_ = inds return self
def __init__(self, test_model=False, verify_model=True): model = Word2Vec.load(modelfile) if(test_model): acc = model.accuracy(questionfile) logger.info("Test model " + modelfile + " in " + questionfile) self.vector_size = model.vector_size self.vocab_size = len(model.wv.vocab) + 1 self.word2index = self.GetWord2Index(model) self.index2word = self.GetIndex2Word(model) self.wordvector = self.GetWordVector(model) if(verify_model): logger.info("Verifing imported word2vec model") random_state = check_random_state(12) check_index = random_state.randint(low=0, high=self.vocab_size-2,size=1000) for index in check_index: word_wv = model.wv.index2word[index] word_our = self.index2word[index+1] #print(index, word_wv, word_our) assert word_wv == word_our assert model.wv.vocab[word_our].index == self.word2index[word_our] - 1 assert np.array_equal(model.wv[word_our], self.wordvector[self.word2index[word_our]]) logger.info("Imported word2vec model is verified")
def __init__(self, n_components=256, learning_rate=0.1, batch_size=10, n_iter=10, verbose=0, random_state=None, lr_backoff=False, weight_cost=0): self.n_components = n_components self.base_learning_rate = learning_rate self.learning_rate = learning_rate self.lr_backoff = lr_backoff self.batch_size = batch_size self.n_iter = n_iter self.verbose = verbose self.random_state = random_state self.rng_ = check_random_state(self.random_state) self.weight_cost = weight_cost # A history of some summary statistics recorded at the end of each epoch of training # Each key maps to a 2-d array. One row per 'session', one value per epoch. # (Another session means this model was pickled, then loaded and fit again.) self.history = {'pseudo-likelihood': [], 'overfit': []} # TODO # Experimental: How many times more fantasy particles compared to minibatch size
def _calculate(self, X, y, categorical, metafeatures, helpers): import sklearn.tree if len(y.shape) == 1 or y.shape[1] == 1: kf = cross_validation.StratifiedKFold(y, n_folds=10) else: kf = cross_validation.KFold(y.shape[0], n_folds=10) accuracy = 0. for train, test in kf: random_state = check_random_state(42) tree = sklearn.tree.DecisionTreeClassifier(random_state=random_state) if len(y.shape) == 1 or y.shape[1] == 1: tree.fit(X[train], y[train]) else: tree = OneVsRestClassifier(tree) tree.fit(X[train], y[train]) predictions = tree.predict(X[test]) accuracy += sklearn.metrics.accuracy_score(predictions, y[test]) return accuracy / 10
def _calculate(self, X, y, categorical, metafeatures, helpers): import sklearn.tree if len(y.shape) == 1 or y.shape[1] == 1: kf = cross_validation.StratifiedKFold(y, n_folds=10) else: kf = cross_validation.KFold(y.shape[0], n_folds=10) accuracy = 0. for train, test in kf: random_state = check_random_state(42) node = sklearn.tree.DecisionTreeClassifier( criterion="entropy", max_depth=1, random_state=random_state, min_samples_split=1, min_samples_leaf=1, max_features=None) if len(y.shape) == 1 or y.shape[1] == 1: node.fit(X[train], y[train]) else: node = OneVsRestClassifier(node) node.fit(X[train], y[train]) predictions = node.predict(X[test]) accuracy += sklearn.metrics.accuracy_score(predictions, y[test]) return accuracy / 10
def _calculate(self, X, y, categorical, metafeatures, helpers): import sklearn.tree if len(y.shape) == 1 or y.shape[1] == 1: kf = cross_validation.StratifiedKFold(y, n_folds=10) else: kf = cross_validation.KFold(y.shape[0], n_folds=10) accuracy = 0. for train, test in kf: random_state = check_random_state(42) node = sklearn.tree.DecisionTreeClassifier( criterion="entropy", max_depth=1, random_state=random_state, min_samples_split=1, min_samples_leaf=1, max_features=1) if len(y.shape) == 1 or y.shape[1] == 1: node.fit(X[train], y[train]) else: node = OneVsRestClassifier(node) node.fit(X[train], y[train]) predictions = node.predict(X[test]) accuracy += sklearn.metrics.accuracy_score(predictions, y[test]) return accuracy / 10
def __init__(self, activation='relu', learning_rate_init=0.001, learning_rule='const', lamb=0.0, max_iter=200, num_hidden_nodes=[8], num_hidden_layers=1, momentum=0.9, beta=0.0, ro0=0.05, shuffle=True, batch_size="auto", random_state=None): # self.nonlinear = (sigmoid, dsigmoid) self.activation = activation self.learning_rate_init = learning_rate_init self.lamb = lamb self.max_iter = max_iter self.num_input_nodes = None self.num_res_nodes = None self.shuffle = shuffle self.batch_size = batch_size if not isinstance(num_hidden_nodes, list): raise TypeError('must be a list!') self.num_hidden_nodes = num_hidden_nodes self.num_hidden_layers = num_hidden_layers self.n_layers_ = num_hidden_layers + 2 self._random_state = check_random_state(random_state) self.ww = None self.th = None
def __init__(self, activation='logistic', learning_rate_init=0.03, learning_rule='const', lamb=0.0, max_iter=200, num_hidden_nodes=[100], num_hidden_layers=1, momentum=0.9, beta=0.0, ro0=0.05, shuffle=True, random_state=None): # self.nonlinear = (sigmoid, dsigmoid) self.activation = activation self.learning_rate_init = learning_rate_init self.lamb = lamb self.momentum = momentum self.beta = beta self.ro0 = ro0 self.max_iter = max_iter self.num_input_nodes = None self.num_res_nodes = None self.shuffle = shuffle if not isinstance(num_hidden_nodes, list): raise TypeError('must be a list!') self.num_hidden_nodes = num_hidden_nodes self.num_hidden_layers = num_hidden_layers self.layers_ = num_hidden_layers + 2 self._random_state = check_random_state(random_state) self.ww = None self.th = None self.predict_ = None
def test_dict_mf_reconstruction_sparse_dict(solver): X, Q = generate_sparse_synthetic(500, 4) rng = check_random_state(0) dict_init = Q + rng.randn(*Q.shape) * 0.2 dict_mf = DictFact(n_components=4, code_alpha=1e-2, n_epochs=2, code_l1_ratio=0, comp_l1_ratio=1, dict_init=dict_init, G_agg=solver_dict[solver]['G_agg'], Dx_agg=solver_dict[solver]['Dx_agg'], random_state=rng_global) dict_mf.fit(X) Q_rec = dict_mf.components_ Q_rec /= np.sqrt(np.sum(Q_rec ** 2, axis=1))[:, np.newaxis] Q /= np.sqrt(np.sum(Q ** 2, axis=1))[:, np.newaxis] G = np.abs(Q_rec.dot(Q.T)) recovered_maps = min(np.sum(np.any(G > 0.95, axis=1)), np.sum(np.any(G > 0.95, axis=0))) assert (recovered_maps >= 4)
def enet_regression_multi_gram_(G, Dx, X, code, l1_ratio, alpha, positive): batch_size = code.shape[0] if l1_ratio == 0: n_components = G.shape[1] for i in range(batch_size): G.flat[::n_components + 1] += alpha code[i] = linalg.solve(G[i], Dx[i]) G.flat[::n_components + 1] -= alpha else: # Unused but unfortunate API random_state = check_random_state(0) for i in range(batch_size): cd_fast.enet_coordinate_descent_gram( code[i], alpha * l1_ratio, alpha * ( 1 - l1_ratio), G[i], Dx[i], X[i], 100, 1e-2, random_state, False, positive) return code
def fit(self, X, y=None): self.random_state = check_random_state(self.random_state) i_h, i_w, n_channels = X.shape if self.patch_size is None: patch_size = i_h // 10, i_w // 10 else: patch_size = self.patch_size patch_shape = (patch_size[0], patch_size[1], n_channels) self.patches_ = extract_patches(X, patch_shape=patch_shape) clean = np.all(X != -1) if not clean: self.indices_3d = clean_mask(self.patches_, X) else: self.indices_3d = fill(*self.patches_.shape[:3]) n_samples = self.indices_3d.shape[0] selection = self.random_state.permutation(n_samples)[:self.max_patches] self.indices_3d = self.indices_3d[selection] return self
def test_scale_patches(): patch_size = (8, 8, 3) n = 100 shape = (n, ) + patch_size rs = check_random_state(0) X = rs.randn(*shape) Y = scale_patches(X, with_mean=True, with_std=True, channel_wise=True) assert_array_almost_equal(Y.mean(axis=(1, 2)), 0) assert_array_almost_equal(np.sum(Y ** 2, axis=(1, 2)), 1 / 3) scale_patches(X, with_mean=True, with_std=True, channel_wise=True, copy=False) assert_array_equal(X, Y) X = rs.randn(*shape) Y = scale_patches(X, with_mean=False, with_std=True, channel_wise=True) assert_array_almost_equal(np.sum(Y ** 2, axis=(1, 2)), 1 / 3) Y = scale_patches(X, with_mean=True, with_std=False, channel_wise=True) assert_array_almost_equal(Y.mean(axis=(1, 2)), 0) Y = scale_patches(X, with_mean=True, with_std=True, channel_wise=False) assert_array_almost_equal(Y.mean(axis=(1, 2, 3)), 0) assert_array_almost_equal(np.sum(Y ** 2, axis=(1, 2, 3)), 1)
def run(n_seeds, n_jobs, _run, _seed): seed_list = check_random_state(_seed).randint(np.iinfo(np.uint32).max, size=n_seeds) exps = [] exps += [{'method': 'sgd', 'step_size': step_size} for step_size in np.logspace(-3, 3, 7)] exps += [{'method': 'gram', 'reduction': reduction} for reduction in [1, 4, 6, 8, 12, 24]] rundir = join(basedir, str(_run._id), 'run') if not os.path.exists(rundir): os.makedirs(rundir) Parallel(n_jobs=n_jobs, verbose=10)(delayed(single_run)(config_updates, rundir, i) for i, config_updates in enumerate(exps))
def _init(self, X, lengths=None): if not self._check_input_symbols(X): raise ValueError("expected a sample from " "a Multinomial distribution.") super(MultinomialHMM, self)._init(X, lengths=lengths) self.random_state = check_random_state(self.random_state) if 'e' in self.init_params: if not hasattr(self, "n_features"): symbols = set() for i, j in iter_from_X_lengths(X, lengths): symbols |= set(X[i:j].flatten()) self.n_features = len(symbols) self.emissionprob_ = self.random_state \ .rand(self.n_components, self.n_features) normalize(self.emissionprob_, axis=1)
def _generate_sample_from_state(self, state, random_state=None): if random_state is None: random_state = self.random_state random_state = check_random_state(random_state) cur_means = self.means_[state] cur_covs = self.covars_[state] cur_weights = self.weights_[state] i_gauss = random_state.choice(self.n_mix, p=cur_weights) mean = cur_means[i_gauss] if self.covariance_type == 'tied': cov = cur_covs else: cov = cur_covs[i_gauss] return sample_gaussian(mean, cov, self.covariance_type, random_state=random_state)
def sample_blobs(n, ratio, rows=5, cols=5, sep=10, rs=None): rs = check_random_state(rs) # ratio is eigenvalue ratio correlation = (ratio - 1) / (ratio + 1) # generate within-blob variation mu = np.zeros(2) sigma = np.eye(2) X = rs.multivariate_normal(mu, sigma, size=n) corr_sigma = np.array([[1, correlation], [correlation, 1]]) Y = rs.multivariate_normal(mu, corr_sigma, size=n) # assign to blobs X[:, 0] += rs.randint(rows, size=n) * sep X[:, 1] += rs.randint(cols, size=n) * sep Y[:, 0] += rs.randint(rows, size=n) * sep Y[:, 1] += rs.randint(cols, size=n) * sep return X, Y ################################################################################ ### Sample images from GANs
def generate_sample_indices(random_state, n_samples): """ Generates bootstrap indices for each tree fit. Parameters ---------- random_state: int, RandomState instance or None If int, random_state is the seed used by the random number generator. If RandomState instance, random_state is the random number generator. If None, the random number generator is the RandomState instance used by np.random. n_samples: int Number of samples to generate from each tree. Returns ------- sample_indices: array-like, shape=(n_samples), dtype=np.int32 Sample indices. """ random_instance = check_random_state(random_state) sample_indices = random_instance.randint(0, n_samples, n_samples) return sample_indices
def _init_weights(self, n_features): """Initialize the parameter weights.""" rng = check_random_state(self.random_state) # Use the initialization method recommended by Glorot et al. weight_init_bound = np.sqrt(6. / (n_features + self.n_hidden)) self.coef_hidden_ = rng.uniform(-weight_init_bound, weight_init_bound, (n_features, self.n_hidden)) self.intercept_hidden_ = rng.uniform(-weight_init_bound, weight_init_bound, self.n_hidden) if self.weight_scale != 1: self.coef_hidden_ *= self.weight_scale self.intercept_hidden_ *= self.weight_scale
def test_multiprocessing(): generator = check_random_state(0) data = genData(n_samples=200, n_features=4, n_redundant=2,strRel=2, n_repeated=0, class_sep=1, flip_y=0, random_state=generator) X_orig, y = data X_orig = StandardScaler().fit(X_orig).transform(X_orig) X = np.c_[X_orig, generator.normal(size=(len(X_orig), 6))] y = list(y) # regression test: list should be supported # Test using the score function fri = EnsembleFRI(FRIClassification(random_state=generator),n_bootstraps=5,n_jobs=2, random_state=generator) fri.fit(X, y) # non-regression test for missing worst feature: assert len(fri.allrel_prediction_) == X.shape[1] assert len(fri.interval_) == X.shape[1] # All strongly relevant features have a lower bound > 0 assert np.all(fri.interval_[0:2,0]>0) # All weakly relevant features should have a lower bound 0 assert np.any(fri.interval_[2:4,0]>0) == False
def test_shape(): n = 100 d = 10 strRel = 2 generator = check_random_state(1337) X, Y = genData.genRegressionData(n_samples=n, n_features=d, n_redundant=0, strRel=strRel, n_repeated=0, random_state=generator) assert X.shape == (n, d) X, Y = genData.genRegressionData(n_samples=n, n_features=d, n_redundant=2, strRel=strRel, n_repeated=1, random_state=generator) assert X.shape == (n, d) X, Y = genData.genRegressionData(n_samples=n, n_features=d, n_redundant=2, strRel=0, n_repeated=1, random_state=generator) assert X.shape == (n, d)
def __init__(self, lambda_, n_thresholds, max_depth, alpha, beta, tau, n_fringe=None, bias=None, uniform=False, batch=False, random_state=None): self.lambda_ = lambda_ self.n_features = None self.n_thresholds = n_thresholds self.max_depth = 2**31-1 if max_depth is None else max_depth self.alpha = alpha self.beta = beta self.tau = tau self.n_fringe = 2**31-1 if n_fringe is None else n_fringe self.bias = 0.0 if bias is None else bias self.uniform = uniform self.batch = batch self.random_state = check_random_state(random_state) self.tree_ = Tree() self.grower = None
def __init__(self, name,classifier=None, number_gen=20, verbose=0, repeat=1, parallel=False, make_logbook=False, random_state=None, cv_metric_fuction=make_scorer(matthews_corrcoef), features_metric_function=None): self._name = name self.estimator = SVC(kernel='linear', max_iter=10000) if classifier is None else clone(classifier) self.number_gen = number_gen self.verbose = verbose self.repeat = repeat self.parallel=parallel self.make_logbook = make_logbook self.random_state = random_state self.cv_metric_function= cv_metric_fuction self.features_metric_function= features_metric_function self._random_object = check_random_state(self.random_state) random.seed(self.random_state)
def make_data(random_state, n_samples_per_center, grid_size, scale): random_state = check_random_state(random_state) centers = np.array([[i, j] for i in range(grid_size) for j in range(grid_size)]) n_clusters_true, n_features = centers.shape noise = random_state.normal( scale=scale, size=(n_samples_per_center, centers.shape[1])) X = np.concatenate([c + noise for c in centers]) y = np.concatenate([[i] * n_samples_per_center for i in range(n_clusters_true)]) return shuffle(X, y, random_state=random_state) # Part 1: Quantitative evaluation of various init methods
def plot(func): random_state = check_random_state(0) one_core = [] multi_core = [] sample_sizes = range(1000, 6000, 1000) for n_samples in sample_sizes: X = random_state.rand(n_samples, 300) start = time.time() func(X, n_jobs=1) one_core.append(time.time() - start) start = time.time() func(X, n_jobs=-1) multi_core.append(time.time() - start) pl.figure('scikit-learn parallel %s benchmark results' % func.__name__) pl.plot(sample_sizes, one_core, label="one core") pl.plot(sample_sizes, multi_core, label="multi core") pl.xlabel('n_samples') pl.ylabel('Time (s)') pl.title('Parallel %s' % func.__name__) pl.legend()
def test_classification(): # Check classification for various parameter settings. rng = check_random_state(0) X_train, X_test, y_train, y_test = train_test_split(iris.data, iris.target, random_state=rng) grid = ParameterGrid({"max_samples": [0.5, 1.0], "max_features": [1, 2, 4], "bootstrap": [True, False], "bootstrap_features": [True, False]}) for base_estimator in [None, DummyClassifier(), Perceptron(), DecisionTreeClassifier(), KNeighborsClassifier(), SVC()]: for params in grid: BaggingClassifier(base_estimator=base_estimator, random_state=rng, **params).fit(X_train, y_train).predict(X_test)
def test_single_estimator(): # Check singleton ensembles. rng = check_random_state(0) X_train, X_test, y_train, y_test = train_test_split(boston.data, boston.target, random_state=rng) clf1 = BaggingRegressor(base_estimator=KNeighborsRegressor(), n_estimators=1, bootstrap=False, bootstrap_features=False, random_state=rng).fit(X_train, y_train) clf2 = KNeighborsRegressor().fit(X_train, y_train) assert_array_equal(clf1.predict(X_test), clf2.predict(X_test))
def test_iforest_parallel_regression(): """Check parallel regression.""" rng = check_random_state(0) X_train, X_test, y_train, y_test = train_test_split(boston.data, boston.target, random_state=rng) ensemble = IsolationForest(n_jobs=3, random_state=0).fit(X_train) ensemble.set_params(n_jobs=1) y1 = ensemble.predict(X_test) ensemble.set_params(n_jobs=2) y2 = ensemble.predict(X_test) assert_array_almost_equal(y1, y2) ensemble = IsolationForest(n_jobs=1, random_state=0).fit(X_train) y3 = ensemble.predict(X_test) assert_array_almost_equal(y1, y3)
def test_iforest_performance(): """Test Isolation Forest performs well""" # Generate train/test data rng = check_random_state(2) X = 0.3 * rng.randn(120, 2) X_train = np.r_[X + 2, X - 2] X_train = X[:100] # Generate some abnormal novel observations X_outliers = rng.uniform(low=-4, high=4, size=(20, 2)) X_test = np.r_[X[100:], X_outliers] y_test = np.array([0] * 20 + [1] * 20) # fit the model clf = IsolationForest(max_samples=100, random_state=rng).fit(X_train) # predict scores (the lower, the more normal) y_pred = clf.predict(X_test) # check that there is at most 6 errors (false positive or false negative) assert_greater(roc_auc_score(y_test, y_pred), 0.98)
def test_errors_and_values_helper(): ridgecv = _RidgeGCV() rng = check_random_state(42) alpha = 1. n = 5 y = rng.randn(n) v = rng.randn(n) Q = rng.randn(len(v), len(v)) QT_y = Q.T.dot(y) G_diag, c = ridgecv._errors_and_values_helper(alpha, y, v, Q, QT_y) # test that helper function behaves as expected out, c_ = ridgecv._errors(alpha, y, v, Q, QT_y) np.testing.assert_array_equal(out, (c / G_diag) ** 2) np.testing.assert_array_equal(c, c) out, c_ = ridgecv._values(alpha, y, v, Q, QT_y) np.testing.assert_array_equal(out, y - (c / G_diag)) np.testing.assert_array_equal(c_, c)
def test_errors_and_values_svd_helper(): ridgecv = _RidgeGCV() rng = check_random_state(42) alpha = 1. for n, p in zip((5, 10), (12, 6)): y = rng.randn(n) v = rng.randn(p) U = rng.randn(n, p) UT_y = U.T.dot(y) G_diag, c = ridgecv._errors_and_values_svd_helper(alpha, y, v, U, UT_y) # test that helper function behaves as expected out, c_ = ridgecv._errors_svd(alpha, y, v, U, UT_y) np.testing.assert_array_equal(out, (c / G_diag) ** 2) np.testing.assert_array_equal(c, c) out, c_ = ridgecv._values_svd(alpha, y, v, U, UT_y) np.testing.assert_array_equal(out, y - (c / G_diag)) np.testing.assert_array_equal(c_, c)
def generate_toy_data(n_components, n_samples, image_size, random_state=None): n_features = image_size[0] * image_size[1] rng = check_random_state(random_state) U = rng.randn(n_samples, n_components) V = rng.randn(n_components, n_features) centers = [(3, 3), (6, 7), (8, 1)] sz = [1, 2, 1] for k in range(n_components): img = np.zeros(image_size) xmin, xmax = centers[k][0] - sz[k], centers[k][0] + sz[k] ymin, ymax = centers[k][1] - sz[k], centers[k][1] + sz[k] img[xmin:xmax][:, ymin:ymax] = 1.0 V[k, :] = img.ravel() # Y is defined by : Y = UV + noise Y = np.dot(U, V) Y += 0.1 * rng.randn(Y.shape[0], Y.shape[1]) # Add noise return Y, U, V # SparsePCA can be a bit slow. To avoid having test times go up, we # test different aspects of the code in the same test
def test_binary_perplexity_stability(): # Binary perplexity search should be stable. # The binary_search_perplexity had a bug wherein the P array # was uninitialized, leading to sporadically failing tests. k = 10 n_samples = 100 random_state = check_random_state(0) distances = random_state.randn(n_samples, 2).astype(np.float32) # Distances shouldn't be negative distances = np.abs(distances.dot(distances.T)) np.fill_diagonal(distances, 0.0) last_P = None neighbors_nn = np.argsort(distances, axis=1)[:, :k].astype(np.int64) for _ in range(100): P = _binary_search_perplexity(distances.copy(), neighbors_nn.copy(), 3, verbose=0) P1 = _joint_probabilities_nn(distances, neighbors_nn, 3, verbose=0) if last_P is None: last_P = P last_P1 = P1 else: assert_array_almost_equal(P, last_P, decimal=4) assert_array_almost_equal(P1, last_P1, decimal=4)
def test_gradient(): # Test gradient of Kullback-Leibler divergence. random_state = check_random_state(0) n_samples = 50 n_features = 2 n_components = 2 alpha = 1.0 distances = random_state.randn(n_samples, n_features).astype(np.float32) distances = distances.dot(distances.T) np.fill_diagonal(distances, 0.0) X_embedded = random_state.randn(n_samples, n_components) P = _joint_probabilities(distances, desired_perplexity=25.0, verbose=0) fun = lambda params: _kl_divergence(params, P, alpha, n_samples, n_components)[0] grad = lambda params: _kl_divergence(params, P, alpha, n_samples, n_components)[1] assert_almost_equal(check_grad(fun, grad, X_embedded.ravel()), 0.0, decimal=5)
def test_preserve_trustworthiness_approximately(): # Nearest neighbors should be preserved approximately. random_state = check_random_state(0) # The Barnes-Hut approximation uses a different method to estimate # P_ij using only a number of nearest neighbors instead of all # points (so that k = 3 * perplexity). As a result we set the # perplexity=5, so that the number of neighbors is 5%. n_components = 2 methods = ['exact', 'barnes_hut'] X = random_state.randn(100, n_components).astype(np.float32) for init in ('random', 'pca'): for method in methods: tsne = TSNE(n_components=n_components, perplexity=50, learning_rate=100.0, init=init, random_state=0, method=method) X_embedded = tsne.fit_transform(X) T = trustworthiness(X, X_embedded, n_neighbors=1) assert_almost_equal(T, 1.0, decimal=1)
def test_verbose(): # Verbose options write to stdout. random_state = check_random_state(0) tsne = TSNE(verbose=2) X = random_state.randn(5, 2) old_stdout = sys.stdout sys.stdout = StringIO() try: tsne.fit_transform(X) finally: out = sys.stdout.getvalue() sys.stdout.close() sys.stdout = old_stdout assert("[t-SNE]" in out) assert("Computing pairwise distances" in out) assert("Computed conditional probabilities" in out) assert("Mean sigma" in out) assert("Finished" in out) assert("early exaggeration" in out) assert("Finished" in out)
def test_arpack_eigsh_initialization(): # Non-regression test that shows null-space computation is better with # initialization of eigsh from [-1,1] instead of [0,1] random_state = check_random_state(42) A = random_state.rand(50, 50) A = np.dot(A.T, A) # create s.p.d. matrix A = graph_laplacian(A) + 1e-7 * np.identity(A.shape[0]) k = 5 # Test if eigsh is working correctly # New initialization [-1,1] (as in original ARPACK) # Was [0,1] before, with which this test could fail v0 = random_state.uniform(-1,1, A.shape[0]) w, _ = eigsh(A, k=k, sigma=0.0, v0=v0) # Eigenvalues of s.p.d. matrix should be nonnegative, w[0] is smallest assert_greater_equal(w[0], 0)
def test_rfe_features_importance(): generator = check_random_state(0) iris = load_iris() X = np.c_[iris.data, generator.normal(size=(len(iris.data), 6))] y = iris.target clf = RandomForestClassifier(n_estimators=20, random_state=generator, max_depth=2) rfe = RFE(estimator=clf, n_features_to_select=4, step=0.1) rfe.fit(X, y) assert_equal(len(rfe.ranking_), X.shape[1]) clf_svc = SVC(kernel="linear") rfe_svc = RFE(estimator=clf_svc, n_features_to_select=4, step=0.1) rfe_svc.fit(X, y) # Check if the supports are equal assert_array_equal(rfe.get_support(), rfe_svc.get_support())
def test_oneclass_decision_function(): # Test OneClassSVM decision function clf = svm.OneClassSVM() rnd = check_random_state(2) # Generate train data X = 0.3 * rnd.randn(100, 2) X_train = np.r_[X + 2, X - 2] # Generate some regular novel observations X = 0.3 * rnd.randn(20, 2) X_test = np.r_[X + 2, X - 2] # Generate some abnormal novel observations X_outliers = rnd.uniform(low=-4, high=4, size=(20, 2)) # fit the model clf = svm.OneClassSVM(nu=0.1, kernel="rbf", gamma=0.1) clf.fit(X_train) # predict things y_pred_test = clf.predict(X_test) assert_greater(np.mean(y_pred_test == 1), .9) y_pred_outliers = clf.predict(X_outliers) assert_greater(np.mean(y_pred_outliers == -1), .9) dec_func_test = clf.decision_function(X_test) assert_array_equal((dec_func_test > 0).ravel(), y_pred_test == 1) dec_func_outliers = clf.decision_function(X_outliers) assert_array_equal((dec_func_outliers > 0).ravel(), y_pred_outliers == 1)
def test_graph_lasso_cv(random_state=1): # Sample data from a sparse multivariate normal dim = 5 n_samples = 6 random_state = check_random_state(random_state) prec = make_sparse_spd_matrix(dim, alpha=.96, random_state=random_state) cov = linalg.inv(prec) X = random_state.multivariate_normal(np.zeros(dim), cov, size=n_samples) # Capture stdout, to smoke test the verbose mode orig_stdout = sys.stdout try: sys.stdout = StringIO() # We need verbose very high so that Parallel prints on stdout GraphLassoCV(verbose=100, alphas=5, tol=1e-1).fit(X) finally: sys.stdout = orig_stdout # Smoke test with specified alphas GraphLassoCV(alphas=[0.8, 0.5], tol=1e-1, n_jobs=1).fit(X)
def _iter_indices(self, frame, y=None): """Iterate the indices. Parameters ---------- frame : H2OFrame The frame to split y : string, optional (default=None) The column to stratify. Since this class does not perform stratification, ``y`` is unused. Returns ------- ind_train : np.ndarray, shape=(n_samples,) The train indices ind_test : np.ndarray, shape=(n_samples,) The test indices """ n_samples = frame.shape[0] n_train, n_test = _validate_shuffle_split(n_samples, self.test_size, self.train_size) rng = check_random_state(self.random_state) for i in range(self.n_splits): permutation = rng.permutation(n_samples) ind_test = permutation[:n_test] ind_train = permutation[n_test:(n_test + n_train)] yield ind_train, ind_test
def _iter_test_indices(self, frame, y=None): n_obs = frame.shape[0] indices = np.arange(n_obs) if self.shuffle: check_random_state(self.random_state).shuffle(indices) n_folds = self.n_folds fold_sizes = (n_obs // n_folds) * np.ones(n_folds, dtype=np.int) fold_sizes[:n_obs % n_folds] += 1 current = 0 for fold_size in fold_sizes: start, stop = current, current + fold_size yield indices[start:stop] current = stop
def __init__(self, problem, w_star, min_regret=0, noise=0, rng=None): self.problem = problem self.w_star = w_star self.min_regret = min_regret self.noise = noise self.rng = check_random_state(rng) self.x_star = self.problem.infer(self.w_star) self.u_star = self.utility(self.x_star)
def run(args): """Runs an experiment over several groups of users. It takes care of sampling the user groups (or load them from disk, if available), calling the actual MUSM algorithm on each group, and dumping the results to file. """ problem = PROBLEMS[args['problem']]() try: groups = musm.load(args['groups']) except: groups = sample_groups(problem, musm.subdict(args, nokeys={'problem'})) if args['groups'] is not None: musm.dump(args['groups'], groups) rng = check_random_state(args['seed']) traces = [] for gid in range(args['num_groups']): traces.append(musm.musm(problem, groups[gid], set_size=args['set_size'], max_iters=args['max_iters'], enable_cv=args['enable_cv'], pick=args['pick'], transform=args['transform'], lmbda=args['lmbda'], tau=args['tau'], rng=0)) musm.dump(get_results_path(args), {'args': args, 'traces': traces})
def check_random_state(seed): """Turn seed into a np.random.RandomState instance.""" if seed is None or seed is np.random: return np.random.mtrand._rand if isinstance(seed, (numbers.Integral, np.integer)): return np.random.RandomState(seed) if isinstance(seed, np.random.RandomState): return seed raise ValueError('%r cannot be used to seed a numpy.random.RandomState' ' instance' % seed)