我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sklearn.utils.validation.check_array()。
def decision_function(self, X): """Compute the distances to the nearest centroid for an array of test vectors X. Parameters ---------- X : array-like, shape = [n_samples, n_features] Returns ------- C : array, shape = [n_samples] """ from sklearn.metrics.pairwise import pairwise_distances from sklearn.utils.validation import check_array, check_is_fitted check_is_fitted(self, 'centroids_') X = check_array(X, accept_sparse='csr') return pairwise_distances(X, self.centroids_, metric=self.metric).min(axis=1)
def from_array(X, column_names=None): """A simple wrapper for H2OFrame.from_python. This takes a numpy array (or 2d array) and returns an H2OFrame with all the default args. Parameters ---------- X : ndarray The array to convert. column_names : list, tuple (default=None) the names to use for your columns Returns ------- H2OFrame """ X = check_array(X, force_all_finite=False) return from_pandas(pd.DataFrame.from_records(data=X, columns=column_names))
def transform(self, X): check_is_fitted(self, ['statistics_', 'estimators_', 'gamma_']) X = check_array(X, copy=True, dtype=np.float64, force_all_finite=False) if X.shape[1] != self.statistics_.shape[1]: raise ValueError("X has %d features per sample, expected %d" % (X.shape[1], self.statistics_.shape[1])) X_nan = np.isnan(X) imputed = self.initial_imputer.fit_transform(X) if len(self.estimators_) > 1: for i, estimator_ in enumerate(self.estimators_): X_s = np.delete(imputed, i, 1) y_nan = X_nan[:, i] X_unk = X_s[y_nan] if len(X_unk) > 0: X[y_nan, i] = estimator_.predict(X_unk) else: estimator_ = self.estimators_[0] X[X_nan] = estimator_.inverse_transform(estimator_.transform(imputed))[X_nan] return X
def predict(self, X): """Applies learned event segmentation to new testing dataset Alternative function for segmenting a new dataset after using fit() to learn a sequence of events, to comply with the sklearn Classifier interface Parameters ---------- X: timepoint by voxel ndarray fMRI data to segment based on previously-learned event patterns Returns ------- Event label for each timepoint """ check_is_fitted(self, ["event_pat_", "event_var_"]) X = check_array(X) segments, test_ll = self.find_events(X) return np.argmax(segments, axis=1)
def transform(self, X): """Scaling features of X according to feature_range. Parameters ---------- X : array-like with shape [n_samples, n_features] Input data that will be transformed. """ check_is_fitted(self, 'scale_') X = check_array(X, accept_sparse="csc", copy=self.copy, dtype=np.float32) if sparse.issparse(X): for i in range(X.shape[1]): X.data[X.indptr[i]:X.indptr[i + 1]] *= self.scale_[i] X.data[X.indptr[i]:X.indptr[i + 1]] += self.min_[i] else: X *= self.scale_ X += self.min_ return X
def predict(self, X): """ A reference implementation of a prediction for a classifier. Parameters ---------- X : array-like of shape = [n_samples, n_features] The input samples. Returns ------- y : array of int of shape = [n_samples] The label for each sample is the label of the closest sample seen udring fit. """ # Check is fit had been called check_is_fitted(self, ['X_', 'y_']) # Input validation X = check_array(X) closest = np.argmin(euclidean_distances(X, self.X_), axis=1) return self.y_[closest]
def fit(self, X, y=None): """A reference implementation of a fitting function for a transformer. Parameters ---------- X : array-like or sparse matrix of shape = [n_samples, n_features] The training input samples. y : None There is no need of a target in a transformer, yet the pipeline API requires this parameter. Returns ------- self : object Returns self. """ X = check_array(X) self.input_shape_ = X.shape # Return the transformer return self
def predict_proba(self, X, X2): """ Returns the probability of class 1 for each x in X. """ try: getattr(self, "intercept1_") getattr(self, "intercept2_") getattr(self, "coef1_") getattr(self, "coef2_") except AttributeError: raise RuntimeError("You must train classifer before predicting data!") X = check_array(X) X2 = check_array(X2) if self.fit_first_intercept: X = np.insert(X, 0, 1, axis=1) if self.fit_second_intercept: X2 = np.insert(X2, 0, 1, axis=1) w = np.insert(self.coef1_, 0, self.intercept1_) w2 = np.insert(self.coef2_, 0, self.intercept2_) return (invlogit_vect(np.dot(w, np.transpose(X))) * invlogit_vect(np.dot(w2, np.transpose(X2))))
def predict_proba(self, X): """ Returns the probability of class 1 for each x in X. """ try: getattr(self, "intercept_") getattr(self, "coef_") except AttributeError: raise RuntimeError("You must train classifer before predicting data!") X = check_array(X) if self.fit_intercept: X = np.insert(X, 0, 1, axis=1) w = np.insert(self.coef_, 0, self.intercept_) return invlogit_vect(np.dot(w, np.transpose(X)))
def fit(self, X, y, **fit_params): assert len(X) == len(y) if self.check_X is not None: assert self.check_X(X) if self.check_y is not None: assert self.check_y(y) self.classes_ = np.unique(check_array(y, ensure_2d=False, allow_nd=True)) if self.expected_fit_params: missing = set(self.expected_fit_params) - set(fit_params) assert len(missing) == 0, ('Expected fit parameter(s) %s not ' 'seen.' % list(missing)) for key, value in fit_params.items(): assert len(value) == len(X), ('Fit parameter %s has length %d; ' 'expected %d.' % (key, len(value), len(X))) return self
def predict(self, X): """ Predict class value for X. :param X: {array-like, sparse matrix}, shape (n_samples, n_features). Input data, where `n_samples` is the number of samples and `n_features` is the number of features. :return: Returns self. """ # Numpy X = np.array(X) # Check is fit had been called check_is_fitted(self, ['X_', 'y_']) # Input validation X = check_array(X) return np.argmax(self.model.predict(X, verbose=self.verbose), axis=1)
def predict_proba(self, X): """ Predict class probabilities for X. :param X: {array-like, sparse matrix}, shape (n_samples, n_features). Input data, where `n_samples` is the number of samples and `n_features` is the number of features. :return: Returns self. """ # Numpy X = np.array(X) # Check is fit had been called check_is_fitted(self, ['X_', 'y_']) # Input validation X = check_array(X) return self.model.predict_proba(X, verbose=self.verbose)
def _validate_X_predict(self, X, check_input): """Validate X whenever one tries to predict, apply, predict_proba""" if self.tree_ is None: raise NotFittedError("Estimator not fitted, " "call `fit` before exploiting the model.") if check_input: X = check_array(X, dtype=DTYPE, accept_sparse="csr") if issparse(X) and (X.indices.dtype != np.intc or X.indptr.dtype != np.intc): raise ValueError("No support for np.int64 index based " "sparse matrices") n_features = X.shape[1] if self.n_features_ != n_features: raise ValueError("Number of features of the model must " "match the input. Model n_features is %s and " "input n_features is %s " % (self.n_features_, n_features)) return X
def _labels_cost(X, centroids): """Calculate labels and cost function given a matrix of points and a list of centroids for the k-modes algorithm. """ X = check_array(X, dtype = "object") npoints = X.shape[0] cost = 0. labels = np.empty(npoints, dtype='int64') for ipoint, curpoint in enumerate(X): diss = matching_dissim(centroids, curpoint) clust = np.argmin(diss) labels[ipoint] = clust cost += diss[clust] return labels, cost
def transform(self, X=None): """Applies the learned transformation to the inputs. Parameters ---------- X : array_like An array of data samples with shape (n_samples, n_features_in) (default: None, defined when fit is called). Returns ------- array_like An array of transformed data samples with shape (n_samples, n_features_out). """ if X is None: X = self.X_ else: X = check_array(X) return X.dot(self.L_.T)
def inverse_transform(self, X): """Undo the scaling of X according to feature_range. Parameters ---------- X : array-like with shape [n_samples, n_features] Input data that will be transformed. """ check_is_fitted(self, 'scale_') X = check_array(X, copy=self.copy, accept_sparse="csc", ensure_2d=False) X -= self.min_ X /= self.scale_ return X
def transform(self, X, y=None, copy=None): """Perform standardization by centering and scaling Parameters ---------- X : array-like with shape [n_samples, n_features] The data used to scale along the features axis. """ check_is_fitted(self, 'std_') copy = copy if copy is not None else self.copy X = check_array(X, copy=copy, accept_sparse="csc", dtype=np.float32, ensure_2d=False) if sparse.issparse(X): if self.center_sparse: for i in range(X.shape[1]): X.data[X.indptr[i]:X.indptr[i + 1]] -= self.mean_[i] elif self.with_mean: raise ValueError( "Cannot center sparse matrices: pass `with_mean=False` " "instead. See docstring for motivation and alternatives.") else: pass if self.std_ is not None: inplace_column_scale(X, 1 / self.std_) else: if self.with_mean: X -= self.mean_ if self.with_std: X /= self.std_ return X
def predict(self, X): """ A reference implementation of a predicting function. Parameters ---------- X : array-like of shape = [n_samples, n_features] The input samples. Returns ------- y : array of shape = [n_samples] Returns :math:`x^2` where :math:`x` is the first column of `X`. """ X = check_array(X) return X[:, 0]**2
def transform(self, X): """ A reference implementation of a transform function. Parameters ---------- X : array-like of shape = [n_samples, n_features] The input samples. Returns ------- X_transformed : array of int of shape = [n_samples, n_features] The array containing the element-wise square roots of the values in `X` """ # Check is fit had been called check_is_fitted(self, ['input_shape_']) # Input validation X = check_array(X) # Check that the input is of the same shape as the one passed # during fit. if X.shape != self.input_shape_: raise ValueError('Shape of input is different from what was seen' 'in `fit`') return np.sqrt(X)
def fit(self, X, y=None): """ Parameters ---------- X : {array, sparse matrix}, shape (n_samples, n_features) List of n_features-dimensional data points. Each row corresponds to a single data point. Returns ------- self : object Returns self. """ from simhash import compute self._fit_X = X = check_array(X, accept_sparse='csr') n_features = X.shape[1] def _scale_hash_32_64bit(indices): return indices*((2**64-1)//2**32-1) hash_func = self.hash_func hashing_table = np.array( [hash_func(el, 0) for el in range(n_features)], dtype='uint64') shash = [] for idx in range(X.shape[0]): # get hashes of indices mhash = hashing_table[X[idx].indices] if self.hash_func_nbytes == 32: mhash = _scale_hash_32_64bit(mhash) shash.append(compute(mhash)) _fit_shash = np.asarray(shash, dtype='uint64') self._fit_shash = _fit_shash self._fit_shash_dict = {val: key for key, val in enumerate(self._fit_shash)}
def fit(self, X, y): """Fit the model using X as training data Parameters ---------- X : {array-like, sparse matrix, BallTree, KDTree} Training data, shape [n_samples, n_features], """ X = check_array(X, accept_sparse='csr') y = np.asarray(y, dtype='int') y_unique = np.unique(y) index = np.arange(len(y), dtype='int') if len(y_unique) == 0: raise ValueError('The training set must have at least ' 'one document category!') # define nearest neighbors search objects for each category self._mod = [NearestNeighbors(n_neighbors=1, leaf_size=self.leaf_size, algorithm=self.algorithm, n_jobs=self.n_jobs, # euclidean metric by default metric='cosine', ) for el in range(len(y_unique))] index_mapping = [] for imod, y_val in enumerate(y_unique): mask = (y == y_val) index_mapping.append(index[mask]) self._mod[imod].fit(X[mask]) self.index_mapping = index_mapping
def kneighbors(self, X=None, batch_size=5000): """Finds the K-neighbors of a point. Returns indices of and distances to the neighbors of each point. Parameters ---------- X : array-like, shape (n_samples, n_features) the input array batch_size : int the batch size Returns ------- S_cos : array [n_samples, n_categories] the cosine similarity to closest point in each category indices : array [n_samples, n_categories] Indices of the nearest points in the population matrix. -------- """ X = check_array(X, accept_sparse='csr') n_classes = len(self._mod) S_res = np.zeros((X.shape[0], n_classes), dtype='float') nn_idx_res = np.zeros((X.shape[0], n_classes), dtype='int') for imod in range(n_classes): D_i, nn_idx_i_loc = _chunk_kneighbors(self._mod[imod].kneighbors, X, batch_size=batch_size) # only NearestNeighbor-1 (only one column in the kneighbors output) # convert from eucledian distance in L2 norm space to cosine # similarity # S_cos = seuclidean_dist2cosine_sim(D_i[:,0]) S_res[:, imod] = 1 - D_i[:, 0] # map local index within index_mapping to global index nn_idx_res[:, imod] = self.index_mapping[imod][nn_idx_i_loc[:, 0]] return S_res, nn_idx_res
def fit(self, X, y=None): """Learn the document lenght and document frequency vector (if necessary). Parameters ---------- X : sparse matrix, [n_samples, n_features] a matrix of term/token counts """ X = check_array(X, ['csr'], copy=self.copy) scheme_t, scheme_d, scheme_n = _validate_smart_notation(self.weighting) self.dl_ = _document_length(X) if scheme_d in 'stp' or self.compute_df: self.df_ = _document_frequency(X) else: self.df_ = None if sp.isspmatrix_csr(X): self.du_ = np.diff(X.indptr) else: self.du_ = X.shape[-1] - (X == 0).sum(axis=1) self._n_features = X.shape[1] if self.df_ is not None: df_n_samples = len(self.dl_) else: df_n_samples = None if scheme_n.endswith('p') and self.norm_pivot is None: # Need to compute the pivot if it's not provided _, self.norm_pivot = _smart_tfidf(X, self.weighting, self.df_, df_n_samples, norm_alpha=self.norm_alpha, norm_pivot=self.norm_pivot, return_pivot=True) return self
def fit_transform(self, X, y=None): """Apply document term weighting and normalization on text features Parameters ---------- X : sparse matrix, [n_samples, n_features] a matrix of term/token counts """ X = check_array(X, ['csr'], copy=self.copy) scheme_t, scheme_d, scheme_n = _validate_smart_notation(self.weighting) self.dl_ = _document_length(X) if scheme_d in 'stpd' or self.compute_df: self.df_ = _document_frequency(X) else: self.df_ = None if sp.isspmatrix_csr(X): self.du_ = np.diff(X.indptr) else: self.du_ = X.shape[-1] - (X == 0).sum(axis=1) self._n_features = X.shape[1] if self.df_ is not None: df_n_samples = len(self.dl_) else: df_n_samples = None if self.df_ is not None: df_n_samples = len(self.dl_) else: df_n_samples = None X, self.norm_pivot = _smart_tfidf(X, self.weighting, self.df_, df_n_samples, norm_alpha=self.norm_alpha, norm_pivot=self.norm_pivot, return_pivot=True) return X
def transform(self, X, y=None): """Apply document term weighting and normalization on text features Parameters ---------- X : sparse matrix, [n_samples, n_features] a matrix of term/token counts copy : boolean, default True Whether to copy X and operate on the copy or perform in-place operations. """ X = check_array(X, ['csr'], copy=self.copy) check_is_fitted(self, 'dl_', 'vector is not fitted') if X.shape[1] != self._n_features: raise ValueError(('Model fitted with n_features={} ' 'but X.shape={}') .format(self._n_features, X.shape)) if self.df_ is not None: df_n_samples = len(self.dl_) else: df_n_samples = None return _smart_tfidf(X, self.weighting, self.df_, df_n_samples, norm_alpha=self.norm_alpha, norm_pivot=self.norm_pivot)
def check_array(self, X): from sklearn.utils.validation import check_array return check_array(X, allow_nd=True, estimator="GPR")
def _predict(self, X): if not hasattr(self, "P_"): raise NotFittedError("Estimator not fitted.") X = check_array(X, accept_sparse='csc', dtype=np.double) X = self._augment(X) return self._get_output(X)
def _predict(self, X): if not hasattr(self, "U_"): raise NotFittedError("Estimator not fitted.") X = check_array(X, accept_sparse='csc', dtype=np.double) X = self._augment(X) X = get_dataset(X, order='fortran') return _lifted_predict(self.U_, X)
def check_feature_array(array, n_features=None): array = check_array(array, ensure_2d=True, allow_nd=False) if n_features is not None and array.shape[1] != n_features: raise ValueError('feature array must have exactly %d features' % n_features) return array
def check_multilabel_array(array, n_labels=None, force_binary=True): array = check_array(array, ensure_2d=True, allow_nd=False, dtype=int) if n_labels is not None and array.shape[1] != n_labels: raise ValueError('multilabel array must have exactly %d labels' % n_labels) if force_binary: count_ones = np.count_nonzero(array == 1) count_zeros = np.count_nonzero(array == 0) if np.size(array) != count_ones + count_zeros: raise ValueError('multilabel array must be binary') return array
def predict(self, X): check_is_fitted(self, ['X_', 'y_']) X = check_array(X) X = DynamicBayesianClassifier._first_col(X) return self._predict(X)
def parallel_fit(self, X, y, client_token=None, est_timeout=None): self.n_outputs_ = 1 self.classes_ = np.array(np.unique(check_array(y, ensure_2d=False, allow_nd=True, dtype=None))) if est_timeout is None: est_timeout = int(1e6) # Store X and y data for workers to use with open(self.X_file.name, 'wb') as outfile: pickle.dump(X, outfile, pickle.HIGHEST_PROTOCOL) with open(self.y_file.name, 'wb') as outfile: pickle.dump(y, outfile, pickle.HIGHEST_PROTOCOL) sigopt_procs = [] for build_args in self.estimator_build_args: # run separaete python process for each estimator with timeout # these processes are wrapped in timeout command to capture case # where a single observation never completes sigopt_procs.append(Popen([ "timeout", str(est_timeout + 10), "python", sklearn_fit.__file__, "--opt_timeout", str(est_timeout), "--estimator", build_args['estimator'], "--X_file", build_args['X_file'], "--y_file", build_args['y_file'], "--client_token", client_token, "--output_file", build_args['output_file'] ])) exit_codes = [p.wait() for p in sigopt_procs] return_codes_args = zip(exit_codes, self.estimator_build_args) # remove estimators that errored or timed out valid_est_args = [rc_args[1] for rc_args in return_codes_args if rc_args[0] == 0] # load valid estimators back into memory for est_arg in valid_est_args: with open(est_arg['output_file'], 'rb') as infile: clf = pickle.load(infile) self.estimator_ensemble.append(clf)
def transform(self, X, mask=None): """Reduce X to the selected features. Parameters ---------- X : array of shape [n_samples, n_features] The input samples. Returns ------- X_r : array of shape [n_samples, n_selected_features] The input samples with only the selected features. """ X = check_array(X, accept_sparse='csr') if mask is None: mask = self.get_support() if not mask.any(): warn("No features were selected: either the data is" " too noisy or the selection test too strict.", UserWarning) return np.empty(0).reshape((X.shape[0], 0)) if len(mask) != X.shape[1]: raise ValueError("X has a different shape than during fitting.") return X[:, self.safe_mask(X, mask)]
def predict(self, x): check_is_fitted(self, "coef_") x = check_array(x) return (self.intercept_ + x @ self.coef_nominator_) / (1 + x @ self.coef_denominator_)
def predict(self, X): # scikit-learn checks X = check_array(X) return np.array([self._predict(x) for x in X])
def fit(self, x, y): x = check_array(x) _, self.n_out = y.reshape(y.shape[0], -1).shape _, n_features = x.shape terminals = [Symbol("x_{}".format(i)) for i in range(n_features)] self.pset = create_pset(self.operators + terminals + self.constants) cls = Cartesian(str(hash(self)), self.pset, n_rows=self.n_rows, n_columns=self.n_columns, n_out=self.n_out, n_back=self.n_back) self.res = oneplus(evaluate(x, y, self.metric), random_state=self.random_state, cls=cls, lambda_=self.lambda_, max_iter=self.max_iter, max_nfev=self.max_nfev, f_tol=self.f_tol, n_jobs=self.n_jobs, seed=self.seed) self.model = compile(self.res.expr) return self
def predict(self, X): """Predict class for every sample in X. Parameters ---------- X : array-like of shape = [n_samples, n_features_idx] The input samples. Returns ------- y : array of shape = [n_samples] """ check_is_fitted(self, 'tree_') X = check_array(X) n_features = X.shape[1] if n_features != self.n_features_: raise ValueError("Number of features of the model must " "match the input. Model n_features is {} and " "input n_features is {}." .format(self.n_features_, n_features)) X_ = np.empty(X.shape) for i in range(self.n_features_): if self.is_numerical_[i]: X_[:, i] = X[:, i] else: try: X_[:, i] = self.X_encoders_[i].transform(X[:, i]) except ValueError as e: raise ValueError('New attribute value not found in ' 'train data.') y = self.builder_._predict(self.tree_, X_) return self.y_encoder_.inverse_transform(y)
def estimate_seasonal_differencing_term(self, x): """Estimate the seasonal differencing term. Parameters ---------- x : array-like, shape=(n_samples,) The time series vector. """ if not self._base_case(x): return 0 # ensure vector x = column_or_1d(check_array( x, ensure_2d=False, dtype=DTYPE, force_all_finite=True)) # type: np.ndarray n = x.shape[0] m = int(self.m) if n < 2 * m + 5: return 0 chstat = self._sd_test(x, m) crit_vals = c(0.4617146, 0.7479655, 1.0007818, 1.2375350, 1.4625240, 1.6920200, 1.9043096, 2.1169602, 2.3268562, 2.5406922, 2.7391007) if m <= 12: return int(chstat > crit_vals[m - 2]) # R does m - 1... if m == 24: return int(chstat > 5.098624) if m == 52: return int(chstat > 10.341416) if m == 365: return int(chstat > 65.44445) return int(chstat > 0.269 * (m ** 0.928))
def _my_lrap(y_true, y_score): """Simple implementation of label ranking average precision""" check_consistent_length(y_true, y_score) y_true = check_array(y_true) y_score = check_array(y_score) n_samples, n_labels = y_true.shape score = np.empty((n_samples, )) for i in range(n_samples): # The best rank correspond to 1. Rank higher than 1 are worse. # The best inverse ranking correspond to n_labels. unique_rank, inv_rank = np.unique(y_score[i], return_inverse=True) n_ranks = unique_rank.size rank = n_ranks - inv_rank # Rank need to be corrected to take into account ties # ex: rank 1 ex aequo means that both label are rank 2. corr_rank = np.bincount(rank, minlength=n_ranks + 1).cumsum() rank = corr_rank[rank] relevant = y_true[i].nonzero()[0] if relevant.size == 0 or relevant.size == n_labels: score[i] = 1 continue score[i] = 0. for label in relevant: # Let's count the number of relevant label with better rank # (smaller rank). n_ranked_above = sum(rank[r] <= rank[label] for r in relevant) # Weight by the rank of the actual label score[i] += n_ranked_above / rank[label] score[i] /= relevant.size return score.mean()
def _check_rows_and_columns(a, b): """Unpacks the row and column arrays and checks their shape.""" check_consistent_length(*a) check_consistent_length(*b) checks = lambda x: check_array(x, ensure_2d=False) a_rows, a_cols = map(checks, a) b_rows, b_cols = map(checks, b) return a_rows, a_cols, b_rows, b_cols
def predict(self, X): X = check_array(X) return np.ones(X.shape[0])
def predict(self, X): if not hasattr(self, 'coef_'): raise CorrectNotFittedError("estimator is not fitted yet") X = check_array(X) return np.ones(X.shape[0])
def predict(self, X): """Perform classification on an array of test vectors X. Parameters ---------- X : array-like, shape = (n_samples, n_features) Returns ------- C : array, shape = (n_samples,) Predicted target values for X, values are from ``classes_`` """ check_is_fitted(self, ["classes_", "n_classes_"]) X = check_array(X) return self.base_estimator_.predict(X)
def run(self, data): """Compute biclustering. Parameters ---------- data : numpy.ndarray """ data = check_array(data, dtype=np.double, copy=True) self._validate_parameters() num_rows, num_cols = data.shape biclusters = [] for i, j in combinations(range(num_rows), 2): cols, corr = self._find_cols(data[i], data[j]) if len(cols) >= self.min_cols and corr >= self.correlation_threshold: rows = [i, j] for k, r in enumerate(data): if k != i and k != j and self._accept(data, rows, cols, r): rows.append(k) b = Bicluster(rows, cols) if not self._exists(biclusters, b): biclusters.append(b) return Biclustering(biclusters)
def run(self, data): """Compute biclustering. Parameters ---------- data : numpy.ndarray """ data = check_array(data, dtype=self._data_type, copy=True) self._validate_parameters() if self.__sleep: sleep(1) # some executables require the number of rows and columns of the dataset as an input argument self._num_rows, self._num_cols = data.shape # creating temp dir to store the executable's inputs and outputs os.mkdir(self.__tmp_dir) self._write_data(data) os.system(self.__exec_comm.format(**self.__dict__)) biclustering = self._parse_output() # removing temp dir shutil.rmtree(self.__tmp_dir) return biclustering
def run(self, data): """Compute biclustering. Parameters ---------- data : numpy.ndarray """ data = check_array(data, dtype=np.double, copy=True) self._validate_parameters() residuals = np.copy(data) num_rows, num_cols = residuals.shape biclusters, layers = [], [] if self.fit_background_layer: background_layer = self._create_layer(residuals) layers.append(background_layer) residuals -= background_layer biclusters.append(Bicluster(np.arange(num_rows), np.arange(num_cols))) for i in range(self.num_biclusters): rows, cols, bicluster_layer = self._fit_layer(residuals) if len(rows) == 0 or len(cols) == 0 or not self._is_significant(residuals, bicluster_layer): break residuals[rows[:, np.newaxis], cols] -= bicluster_layer layers.append(bicluster_layer) biclusters.append(Bicluster(rows, cols)) self._back_fitting(residuals, layers, biclusters) biclustering = Biclustering(biclusters) if self.fit_background_layer: biclusters.pop(0) return biclustering
def run(self, data): """Compute biclustering. Parameters ---------- data : numpy.ndarray """ data = check_array(data, dtype=np.bool, copy=True) self._validate_parameters() data = [np.packbits(row) for row in data] biclusters = [] patterns_found = set() for ri, rj in combinations(data, 2): pattern = np.bitwise_and(ri, rj) pattern_cols = sum(popcount(int(n)) for n in pattern) if pattern_cols >= self.min_cols and self._is_new(patterns_found, pattern): rows = [k for k, r in enumerate(data) if self._match(pattern, r)] if len(rows) >= self.min_rows: cols = np.where(np.unpackbits(pattern) == 1)[0] biclusters.append(Bicluster(rows, cols)) return Biclustering(biclusters)
def run(self, data): """Compute biclustering. Parameters ---------- data : numpy.ndarray """ data = check_array(data, dtype=np.int, copy=True) self._validate_parameters() num_remaining_rows, num_cols = data.shape remaining_rows = np.ones(num_remaining_rows, np.bool) biclusters = [] for i in range(self.num_biclusters): indices = np.where(remaining_rows)[0] b = self._find_motif(data, indices) biclusters.append(b) remaining_rows[b.rows] = False num_remaining_rows -= len(b.rows) if num_remaining_rows == 0: break return Biclustering(biclusters)
def run(self, data): """Compute biclustering. Parameters ---------- data : numpy.ndarray """ data = check_array(data, dtype=np.double, copy=True) self._validate_parameters() data = scale(data) if self.transform: data = np.sign(data) * np.log(1 + np.abs(data)) data = scale(data) biclusters = [] for i in range(self.num_biclusters): best, avg, score = max((self._find_bicluster(data) for i in range(self.randomized_searches)), key=itemgetter(-1)) if score < self.score_threshold: break data[np.ix_(best.rows, best.cols)] -= avg biclusters.append(best) return Biclustering(biclusters)