我们从Python开源项目中,提取了以下31个代码示例,用于说明如何使用numpy.random.permutation()。
def compute_pvalue_with_time_tracking(self,data_x=None,data_y=None): if data_x is None and data_y is None: if not self.streaming and not self.freeze_data: start = time.clock() self.generate_data() data_generating_time = time.clock()-start data_x = self.data_x data_y = self.data_y else: data_generating_time = 0. else: data_generating_time = 0. #print 'data generating time passed: ', data_generating_time SubHSIC_statistic = self.SubHSIC_statistic(unbiased=self.unbiased,data_x=data_x, data_y = data_y) null_samples=zeros(self.num_shuffles) for jj in range(self.num_shuffles): pp = permutation(self.num_samples) yy = self.data_y[pp,:] null_samples[jj]=self.SubHSIC_statistic(data_x = data_x, data_y = yy, unbiased = self.unbiased) pvalue = ( sum( null_samples > SubHSIC_statistic ) ) / float( self.num_shuffles ) return pvalue, data_generating_time
def compute_pvalue_with_time_tracking(self,data_x = None, data_y = None): if data_x is None and data_y is None: if not self.streaming and not self.freeze_data: start = time.clock() self.generate_data() data_generating_time = time.clock()-start data_x = self.data_x data_y = self.data_y else: data_generating_time = 0. else: data_generating_time = 0. print 'data generating time passed: ', data_generating_time SubCorr_statistic = self.SubCorr_statistic(data_x=data_x,data_y=data_y) null_samples=zeros(self.num_shuffles) for jj in range(self.num_shuffles): pp = permutation(self.num_samples) yy = self.data_y[pp,:] null_samples[jj]=self.SubCorr_statistic(data_x = data_x, data_y = yy) pvalue = ( sum( null_samples > SubCorr_statistic ) ) / float( self.num_shuffles ) return pvalue, data_generating_time
def train_ordering(train_imgs, train_masks, train_index): ordering = pd.DataFrame(data = np.arange(len(train_index[:,0])),columns = ['initial_order'],index = pd.MultiIndex.from_arrays([train_index[:,0],train_index[:,1]],names = ['subject','image'])) ordering.sort_index(inplace = True, level = 'subject') #shuffle intra subject ordering = ordering.groupby(level = 'subject').apply(lambda x:x.iloc[random.permutation(len(x))]) ordering.index = ordering.index.droplevel(0) #create new column with new subject image order ordering['new'] = 0 ordering['new'] = ordering['new'].groupby(level = 'subject').transform(lambda x:np.arange(len(x)).T) #take all first images per subject and so on final_ordering = np.array([]) for i in ordering['new'].unique(): idx = ordering.loc[ordering['new'] == i,'initial_order'] # indexes of i'th image for each user after shuffling idx = idx.iloc[random.permutation(len(idx))] # shuffle users in batch final_ordering = np.hstack((final_ordering,idx.values)) final_ordering = final_ordering.astype(int) train_imgs , train_masks, train_index = train_imgs[final_ordering], train_masks[final_ordering], train_index[final_ordering] return train_imgs , train_masks, train_index
def sample_parallel_helper(params): ''' Parameters ---------- params: (i,(statistic, population_A, population_B, NA, NB, ntrials)) Resturns -------- ''' (i,(statistic, population_A, population_B, NA, NB, ntrials)) = params numpy.random.seed() if NA is None: NA = len(population_A) else: assert NA<=len(population_A) if NB is None: NB = len(population_B) else: assert NB<=len(population_B) result = [] for i in range(ntrials): shuffle = random.permutation(concatenate([population_A,population_B])) result.append(abs(statistic(shuffle[:NA])-statistic(shuffle[-NB:]))) return i,result
def lambda_newton_direction(self, active, fixed, vary, max_iter=1): # TODO we should be able to do a warm start... delta = np.zeros_like(vary.Sigma) U = np.zeros_like(vary.Sigma) for _ in range(max_iter): for i, j in rng.permutation(np.array(active).T): if i > j: # seems ok since we look for upper triangular indices in active set continue if i==j: a = vary.Sigma[i,i] ** 2 + 2 * vary.Sigma[i,i] * vary.Psi[i,i] else: a = (vary.Sigma[i, j] ** 2 + vary.Sigma[i, i] * vary.Sigma[j, j] + vary.Sigma[i, i] * vary.Psi[j, j] + 2 * vary.Sigma[i, j] * vary.Psi[i, j] + vary.Sigma[j, j] * vary.Psi[i, i]) b = (fixed.Syy[i, j] - vary.Sigma[i, j] - vary.Psi[i, j] + np.dot(vary.Sigma[i,:], U[:,j]) + np.dot(vary.Psi[i,:], U[:,j]) + np.dot(vary.Psi[j,:], U[:,i])) if i==j: u = -b/a delta[i, i] += u U[i, :] += u * vary.Sigma[i, :] else: c = self.Lam[i, j] + delta[i, j] u = soft_thresh(self.lamL / a, c - b/a) - c delta[j, i] += u delta[i, j] += u U[j, :] += u * vary.Sigma[i, :] U[i, :] += u * vary.Sigma[j, :] return delta
def shuffle(self): batch = self.FLAGS.batch data = self.parse() size = len(data) print('Dataset of {} instance(s)'.format(size)) if batch > size: self.FLAGS.batch = batch = size batch_per_epoch = int(size / batch) for i in range(self.FLAGS.epoch): shuffle_idx = perm(np.arange(size)) for b in range(batch_per_epoch): # yield these x_batch = list() feed_batch = dict() for j in range(b*batch, b*batch+batch): train_instance = data[shuffle_idx[j]] inp, new_feed = self._batch(train_instance) if inp is None: continue x_batch += [np.expand_dims(inp, 0)] for key in new_feed: new = new_feed[key] old_feed = feed_batch.get(key, np.zeros((0,) + new.shape)) feed_batch[key] = np.concatenate([ old_feed, [new] ]) x_batch = np.concatenate(x_batch, 0) yield x_batch, feed_batch print('Finish {} epoch(es)'.format(i + 1))
def joint_and_product_of_the_marginals_split(z, ds): """ Split to samples from the joint and the product of the marginals. Parameters ---------- z : (number of samples, dimension)-ndarray Sample points. ds : int vector Dimension of the individual subspaces in z; ds[i] = i^th subspace dimension. Returns ------- x : (number of samplesx, dimension)-ndarray Samples from the joint. y : (number of samplesy, dimension)-ndarray Sample from the product of the marginals; it is independent of x. """ # verification (sum(ds) = z.shape[1]): if sum(ds) != z.shape[1]: raise Exception('sum(ds) must be equal to z.shape[1]; in other ' + 'words the subspace dimensions do not sum to the' + ' total dimension!') # 0,d_1,d_1+d_2,...,d_1+...+d_{M-1}; starting indices of the subspaces: cum_ds = cumsum(hstack((0, ds[:-1]))) num_of_samples, dim = z.shape num_of_samples2 = num_of_samples//2 # integer division # x, y: x = z[:num_of_samples2, :] y = zeros((num_of_samples2, dim)) # preallocation for m in range(len(ds)): idx = range(cum_ds[m], cum_ds[m] + ds[m]) y[:, idx] = z[ix_(num_of_samples2 + permutation(num_of_samples2), idx)] return x, y
def test_split_data(self): X, y = [], [] N = random.randint(10, 1000) for i in range(N): X.append(random.rand(random.randint(4, 100)).tolist()) y.append(random.randint(0, 10)) perm_indices = random.permutation(N) k = 10 for k_idx in range(k): (X_train, y_train), (X_test, y_test) = split_data(X, y, k_idx=k_idx, k=10, perm_indices=perm_indices) assert sorted(X) == sorted(X_train + X_test) assert sorted(y) == sorted(y_train + y_test) (X_train_1, y_train_1), (X_test_1, y_test_1) = split_data(X, y, k_idx=k_idx, k=10, perm_indices=perm_indices) (X_train_2, y_train_2), (X_test_2, y_test_2) = split_data(X, y, k_idx=k_idx, k=10, perm_indices=perm_indices) assert len(X_train_1) == len(X_train_2) for idx in range(len(X_train_1)): assert X_train_1[idx] == X_train_2[idx] assert y_train_1[idx] == y_train_2[idx] assert len(X_test_1) == len(X_test_2) for idx in range(len(X_test_1)): assert X_test_1[idx] == X_test_2[idx] assert y_test_1[idx] == y_test_2[idx]
def get_sigma_median_heuristic(X, is_sparse = False): if is_sparse: X = X.todense() n=shape(X)[0] if n>1000: X=X[permutation(n)[:1000],:] dists=squareform(pdist(X, 'euclidean')) median_dist=median(dists[dists>0]) sigma=median_dist/sqrt(2.) return sigma
def compute_null_samples_and_pvalue(self,data_x=None,data_y=None,data_z=None): ''' data_x,data_y, data_z are the given data that we wish to test the conditional independence given data_z. > each data set has the number of samples = number of rows > the bandwidth for training set and test set will be different (as we will calculate as soon as data comes in) ''' if data_x is None and data_y is None and data_z is None: if not self.streaming and not self.freeze_data: start = time.clock() self.generate_data(isConditionalTesting=True) data_generating_time = time.clock()-start data_x = self.data_x data_y = self.data_y data_z = self.data_z #print "dimension of data:", np.shape(data_x) else: data_generating_time = 0. else: data_generating_time = 0. #print 'Data generating time passed: ', data_generating_time hsic_statistic, K_epsilon_x, K_epsilon_y, X_CVerror, Y_CVerror = self.compute_test_statistics_and_others(data_x, data_y, data_z) if self.num_shuffles != 0: ny = np.shape(K_epsilon_y)[0] null_samples = np.zeros(self.num_shuffles) for jj in range(self.num_shuffles): pp = permutation(ny) Kp = K_epsilon_y[pp,:][:,pp] null_samples[jj] = self.HSIC_V_statistic(K_epsilon_x, Kp) pvalue = ( sum( null_samples > hsic_statistic ) + 1) / float( self.num_shuffles + 1) #print "P-value:", pvalue else: pvalue = None null_samples = 0 #print "Not interested in P-value" return null_samples, hsic_statistic, pvalue, X_CVerror, Y_CVerror,data_generating_time
def turn_into_null(fn): def null_fn(*args, **kwargs): dataX,dataY=fn(*args, **kwargs) num_samples=shape(dataX)[0] pp = permutation(num_samples) return dataX,dataY[pp] return null_fn
def HSIC_with_shuffles(self,data_x=None,data_y=None,unbiased=True,num_shuffles=0, estimate_nullvar=False,isBlockHSIC=False): start = time.clock() if data_x is None: data_x=self.data_x if data_y is None: data_y=self.data_y time_passed = time.clock()-start if isBlockHSIC: Kx, Ky = self.compute_kernel_matrix_on_dataB(data_x,data_y) else: Kx, Ky = self.compute_kernel_matrix_on_data(data_x,data_y) ny=shape(data_y)[0] if unbiased: test_statistic = HSICTestObject.HSIC_U_statistic(Kx,Ky) else: test_statistic = HSICTestObject.HSIC_V_statistic(Kx,Ky) null_samples=zeros(num_shuffles) for jj in range(num_shuffles): pp = permutation(ny) Kpp = Ky[pp,:][:,pp] if unbiased: null_samples[jj]=HSICTestObject.HSIC_U_statistic(Kx,Kpp) else: null_samples[jj]=HSICTestObject.HSIC_V_statistic(Kx,Kpp) if estimate_nullvar: nullvarx, nullvary = self.unbiased_HSnorm_estimate_of_centred_operator(Kx,Ky) nullvarx = 2.* nullvarx nullvary = 2.* nullvary else: nullvarx, nullvary = None, None return test_statistic,null_samples,nullvarx,nullvary,Kx, Ky, time_passed
def fit(self, X): """Sample a training set. Parameters ---------- X: array-like training set to sample observations from. Returns ---------- self: obj fitted instance with stored sample. """ self.train_shape = X.shape sample_idx = {} for i in range(2): dim_size = min(X.shape[i], self.size) sample_idx[i] = permutation(X.shape[i])[:dim_size] sample = X[ix_(sample_idx[0], sample_idx[1])] self.sample_idx_ = sample_idx self.sample_ = sample return self
def sparsify(a, num_in, sb, sc): assert type(num_in) == int A = a * sb from numpy.random import rand, permutation for i in range(A.shape[1]): perm = permutation(A.shape[0]) SMALL = perm[num_in:] A[SMALL, i] *= sc / sb a[:] = A # ------------------------------------------------------------
def demo_iris_svm(C=1.0, gamma=0.7): iris = datasets.load_iris() perm = permutation(iris.target.size) iris.data = iris.data[perm] iris.target = iris.target[perm] clf = svm.SVC(C, 'rbf', gamma=gamma) clf.fit(iris.data[:90], iris.target[:90]) return clf.score(iris.data[90:], iris.target[90:])
def RandomOrder(ts): def _impl(data): order = npr.permutation(len(ts)) for i in order: data = ts[i](data) return data return _impl # Andre Howard
def __call__(self, perm_len=None): perm_len = self._perm_len if perm_len is None else perm_len return npr.permutation(perm_len)
def __init__(self, perm_len, num_c, x2c): assert perm_len > 0 self._perm_len = perm_len self._num_c = num_c self._x2c = np.array(x2c, np.int32) self._c2x = [] for i in xrange(self._num_c): self._c2x.append(np.where(self._x2c == i)[0]) self._cur_c = -1 self._cls = npr.permutation(self._num_c).tolist() self._cur_x = [-1] * self._num_c for i in xrange(self._num_c): npr.shuffle(self._c2x[i])
def permute_list(l): p = list(np_random.permutation(len(l))) out_l = [l[ix] for ix in p] return (out_l, p)
def yield_data_in_batches(batch_size, X, y=None, shuffle=True): """Generates batches of input data. Parameters ---------- batch_size: int Number of examples in a single batch. X: array-like, shape (n_samples, n_features) The input data. y: array-like, shape (n_samples,) The target values. Can be omitted. shuffle: bool, default True Whether the examples are shuffled or not before put into batches. """ num_rows = X.shape[0] if shuffle: indices_gen = (i for i in permutation(num_rows)) else: indices_gen = (i for i in np.arange(num_rows)) num_yielded = 0 while True: batch_indices = list(islice(indices_gen, batch_size)) num_yielded += len(batch_indices) if y is None: yield X[batch_indices] else: yield X[batch_indices], y[batch_indices] if num_yielded == num_rows: return
def bootstrap_compare_statistic_two_sided(statistic, population_A, population_B, ntrials=1000): ''' Estimate pvalue using bootstrapping Parameters ---------- Resturns -------- ''' nA = len(population_A) nB = len(population_B) n = nA+nB allstats = concatenate([population_A,population_B]) A = statistic(population_A) B = statistic(population_B) def sample(): shuffle = random.permutation(allstats) draw_A, draw_B = shuffle[:nA],shuffle[nA:] s_a = statistic(draw_A) s_b = statistic(draw_B) return abs(s_a-s_b) null_samples = array([sample() for i in xrange(ntrials)]) delta = abs(A-B) pvalue = mean(null_samples>delta) return delta,pvalue
def crossvalidatedAUC(X,Y,NXVAL=4): ''' Crossvalidated area under the ROC curve calculation. This routine uses the non-regularized GLMPenaltyL2 to fit a GLM point-process model and test accuracy under K-fold crossvalidation. Parameters ---------- X : np.array Covariate matrix Nsamples x Nfeatures Y : np.array Binary point-process observations, 1D array length Nsamples NXVAL : positive int Defaults to 4. Number of cross-validation blocks to use Returns ------- float Area under the ROC curve, cross-validated, for non-regularized GLM point process model fit ''' N = X.shape[0] P = permutation(N) X = X[P,:] Y = Y[P] blocksize = N//NXVAL predicted = [] M = np.zeros(X.shape[1]+1) for i in range(NXVAL): a = i*blocksize b = a + blocksize if i==NXVAL-1: b = N train_X = concatenate([X[:a,:],X[b:,:]]) train_Y = concatenate([Y[:a],Y[b:]]) objective, gradient, hessian = GLMPenaltyL2(train_X,train_Y,0) M = minimize(objective,M,jac=gradient,hess=hessian,method='Newton-CG')['x'] mu,B = M[0],M[1:] predicted.append(mu + X[a:b,:].dot(B)) return auc(Y,concatenate(predicted))
def tm_ransac_more_cols(d, sol, sys): r_c = d.shape n = r_c[1] d2 = d ** 2 trycols = setdiff(range(0, n), sol.cols) cl, dl = compactionmatrix(len(sol.rows)) u, s, vh = linalg.svd(sol.Bhat[1:, 1:]) u = u[:, 0:2] for ii in trycols: d2n = d2[sol.rows - 1, ii - 1] maxnrinl = 0 for kk in range(0, sys.ransac_k2): okrows = ((isfinite(d2n)).astype(int)).nonzero() tmp = random.permutation(len(okrows)) if len(tmp) >= 4: tryrows1 = okrows[tmp[0:3]] zz = linalg.inv(dl) * sol.Bhat[:, 0] ZZ_1 = concatenate((zeros(1, 3), u)) ZZ = concatenate((ones(len(sol.rows), 1), ZZ_1), 1) ZZ0 = linalg.inv(ZZ[tryrows1, :]) * ( d2n[tryrows1, 1] - zz[tryrows1, 1]) xx = linalg.inv(ZZ[tryrows1, :]) * ( d2n[tryrows1, 1] - zz[tryrows1, 1]) a = (zz[okrows] + ZZ[:, okrows] * xx) b = d2n[okrows] inlids = where(abs(b - a) < sys.ransac_threshold2) if len(inlids) < maxnrinl: maxnrinl = len(inlids) tmpsol = structtype() tmpsol.rows = sol.rows[tryrows1] tmpsol.col = ii tmpsol.Bhatn = ZZ0 * xx tmpsol.inlrows = sol.rows[okrows[inlids]] if maxnrinl > sys.min_inliers2: sol.cols = concatenate((sol.cols, tmpsol.col), 1) sol.inlmatrix[tmpsol.inlrows, tmpsol.col] = ones( len(tmpsol.inlrows), 1) sol.Bhat = concatenate((sol.Bhat, tmpsol.Bhatn), 1) sol.dl = compactionmatrix(len(sol.cols)) return sol
def mixture_distribution(ys, w): """ Sampling from mixture distribution. The samples are generated from the given samples of the individual distributions and the mixing weights. Parameters ---------- ys : tuple of ndarrays ys[i]: samples from i^th distribution, ys[i][j,:]: j^th sample from the i^th distribution. Requirement: the samples (ys[i][j,:]) have the same dimensions (for all i, j). w : vector, w[i] > 0 (for all i), sum(w) = 1 Mixing weights. Requirement: len(y) = len(w). """ # verification: if sum(w) != 1: raise Exception('sum(w) has to be 1!') if not(all(w > 0)): raise Exception('The coordinates of w have to be positive!') if len(w) != len(ys): raise Exception('len(w)=len(ys) has to hold!') # number of samples, dimensions: num_of_samples_v = array([y.shape[0] for y in ys]) dim_v = array([y.shape[1] for y in ys]) if len(set(dim_v)) != 1: # test if all the dimensions are identical raise Exception('All the distributions in ys need to have the ' + 'same dimensionality!') # take the maximal number of samples (t) for which 't*w1<=t1, ..., # t*wM<=tM', then tm:=floor(t*wm), i.e. compute the trimmed number of # samples: t = min(num_of_samples_v / w) tw = tuple(int(e) for e in floor(t * w)) # mix ys[i]-s: num_of_samples = sum(tw) mixture = zeros((num_of_samples, dim_v[0])) idx_start = 0 for k in range(len(ys)): tw_k = tw[k] idx_stop = idx_start + tw_k # trim the 'irrelevant' part, the result is added to the mixture: mixture[idx_start:idx_stop] = ys[k][:tw_k] # broadcasting idx_start = idx_stop # permute the samples to obtain the mixture (the weights have been # taken into account in the trimming part): mixture = permutation(mixture) # permute along the first dimension return mixture
def _temperature_swaps(self, p, lnprob, logl): """ Perform parallel-tempering temperature swaps on the state in ``p`` with associated ``lnprob`` and ``logl``. """ ntemps = self.ntemps for i in range(ntemps - 1, 0, -1): bi = self.betas[i] bi1 = self.betas[i - 1] dbeta = bi1 - bi iperm = nr.permutation(self.nwalkers) i1perm = nr.permutation(self.nwalkers) raccept = np.log(nr.uniform(size=self.nwalkers)) paccept = dbeta * (logl[i, iperm] - logl[i - 1, i1perm]) self.nswap[i] += self.nwalkers self.nswap[i - 1] += self.nwalkers asel = (paccept > raccept) nacc = np.count_nonzero(asel) self.nswap_accepted[i] += nacc self.nswap_accepted[i - 1] += nacc ptemp = np.copy(p[i, iperm[asel], :]) ltemp = np.copy(logl[i, iperm[asel]]) prtemp = np.copy(lnprob[i, iperm[asel]]) p[i, iperm[asel], :] = p[i - 1, i1perm[asel], :] logl[i, iperm[asel]] = logl[i - 1, i1perm[asel]] lnprob[i, iperm[asel]] = lnprob[i - 1, i1perm[asel]] \ - dbeta * logl[i - 1, i1perm[asel]] p[i - 1, i1perm[asel], :] = ptemp logl[i - 1, i1perm[asel]] = ltemp lnprob[i - 1, i1perm[asel]] = prtemp + dbeta * ltemp return p, lnprob, logl
def pool_exec(workload, processes = -1): global PROGRESS_CTR, PROGRESS_CTR_T, PROGRESS_CTR_T_C, PROGRESS_CTR_POLICY, WORKLOAD_LEN if processes == -1: processes = SIM_PROCESSES if RUN_SIM_ON_REDIS: pool = Pool(processes=12) result = pool.map(run_sim_make, [ (ident,) + w for (ident, w) in enumerate(workload) ]) pool.close() else: sys.stderr.write("[ starting ]") sys.stderr.flush() PROGRESS_CTR = Value('i', 0) PROGRESS_CTR_T = Array('i', [0 for i in range(processes)]) PROGRESS_CTR_POLICY = Array('i', [0 for i in range(processes)]) PROGRESS_CTR_T_C = Array('i', [0 for i in range(processes)]) WORKLOAD_LEN = len(workload) if (processes == 1): result = [] for ix, w in enumerate(workload): result.append( run_sim_make( w ) ) update_progress_bar(ix + 1, len(workload), threads = []) sys.stderr.write("\n") sys.stderr.flush() return result pool = Pool(processes = processes, maxtasksperchild = 1) workload_permuted, permutation = permute_list(workload) result_async = pool.map_async(run_sim_make, workload_permuted, chunksize = 1) total_work = len(workload) while not result_async.ready(): result_async.wait(15) value = PROGRESS_CTR.value update_progress_bar(PROGRESS_CTR.value, total_work, threads = PROGRESS_CTR_T, policies = PROGRESS_CTR_POLICY) sys.stderr.write("\n") sys.stderr.flush() result = unpermute_list(result_async.get(), permutation) pool.close() return result
def make_fitter(vlb, X, callback=None, load_data=True): N, xdim = X.shape # load all data onto the gpu at once... ideally if load_data: X_all = tf.constant(X, name='X') def fit(num_epochs, minibatch_size, L, optimizer, sess): num_batches = N // minibatch_size # set up cost function and updates if load_data: idx = tf.placeholder(tf.int32, name='idx') mbsize = tf.constant(minibatch_size) xdimsize = tf.constant(xdim) x_batch = tf.slice(X_all, tf.pack([idx*mbsize, 0]), tf.pack([mbsize,xdimsize]), name='x_batch') else: x_batch = tf.placeholder(tf.float32, shape=[minibatch_size, xdim], name='X') cost = -tf.reduce_mean(vlb(x_batch, L)) * N train_step = optimizer.minimize(cost) sess.run(tf.initialize_variables(ut.nontrainable_variables())) def train(bidx): if load_data: train_step.run(feed_dict={idx:bidx}, session=sess) return cost.eval(feed_dict={idx:bidx}, session=sess) else: xb = X[bidx*minibatch_size:(bidx+1)*minibatch_size] train_step.run(feed_dict={x_batch: xb}, session=sess) return cost.eval(feed_dict={x_batch: xb}, session=sess) start = time() for i in xrange(num_epochs): bidxs = npr.permutation(num_batches) vals = [train(bidx) for bidx in pyprind.prog_bar(bidxs)] print 'epoch {:>4} of {:>4}: {:> .6}' . \ format(i+1, num_epochs, np.median(vals[-10:])) if callback: callback(i) # will tell you what nodes are being added #tf.get_default_graph().finalize() stop = time() print 'cost {}, {:>5} sec per update, {:>5} sec total\n'.format( np.median(vals[-10:]), (stop - start) / N, stop - start) return fit ######################### # objective functions # #########################
def testNaiveBayesToSpamEmail(): """ ?????????????????? """ emails = [] emails_class = [] for i in range(1, 26): # ?????? words = getContentTokens(open('email/spam/%d.txt' % i).read()) emails.append(words) emails_class.append(1) # ?????? words = getContentTokens(open('email/ham/%d.txt' % i).read()) emails.append(words) emails_class.append(0) # `??????` -- ??????????????, ????????? # ?????, ??? random_order = random.permutation(50) testIndexs, trainIndexs = random_order[:10], random_order[10:] # ???? vocabulary = getVocabulary(emails) # ?????????? trainMatrix = [] trainCategories = [] for docIndex in trainIndexs: trainMatrix.append( getBagOfWords2Vec(vocabulary, emails[docIndex]) # ?????? ) trainCategories.append(emails_class[docIndex]) logging.info('Train dataset is ready.') model = NaiveBayesModel(trainMatrix, trainCategories) logging.info('NaiveBayes model is trained.') # ?????? errorCount = 0 for docIndex in testIndexs: wordVector = getBagOfWords2Vec(vocabulary, emails[docIndex]) result = model.predict(wordVector) if result != emails_class[docIndex]: errorCount += 1 logging.warning('classification error. Predict/Actual: {}/{}\n{}'.format( result, emails_class[docIndex], ' '.join(emails[docIndex]) )) logging.info('the error rate is: {:.2%}'.format(1.0*errorCount/len(testIndexs)))
def getLocalWords(feed1, feed0): summaries = [] summaries_class = [] fullText = [] minLen = min( len(feed1['entries']), len(feed0['entries']) ) for i in range(minLen): # ???feed, ????New York wordList = getContentTokens(feed1['entries'][i]['summary']) summaries.append(wordList) fullText.extend(wordList) summaries_class.append(1) # ???feed wordList = getContentTokens(feed0['entries'][i]['summary']) summaries.append(wordList) fullText.extend(wordList) summaries_class.append(0) vocabulary = getVocabulary(summaries) # `????` -- ???????/??????????? # ????????? www.ranks.nl/resources/stopwords.html # ?????????N?? topN = 30 topNWords = calcMostFreq(vocabulary, fullText, topN) for word, _count in topNWords: if word in vocabulary: vocabulary.remove(word) # ?????, ??? random_order = random.permutation(2*minLen) testIndexs, trainIndexs = random_order[:20], random_order[20:] # ?????????? trainMatrix = [] trainCategories = [] for docIndex in trainIndexs: trainMatrix.append(getBagOfWords2Vec(vocabulary, summaries[docIndex])) trainCategories.append(summaries_class[docIndex]) model = NaiveBayesModel(trainMatrix, trainCategories) # ?????? errorCount = 0 for docIndex in testIndexs: wordVector = getBagOfWords2Vec(vocabulary, summaries[docIndex]) result = model.predict(wordVector) if result != summaries_class[docIndex]: errorCount += 1 logging.warning('[classification error] Predict/Actual: {}/{}\n{}'.format( result, summaries_class[docIndex], ' '.join(summaries[docIndex]) )) logging.info('[error rate] {:.2%}'.format(1.0*errorCount/len(testIndexs))) return vocabulary, model.pWordsVector