我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用numpy.random.RandomState()。
def test_specified_rng(): from npdl.utils.random import get_rng from npdl.utils.random import set_rng from npdl.initializations import Normal from npdl.initializations import Uniform from npdl.initializations import GlorotNormal from npdl.initializations import GlorotUniform from numpy.random import RandomState from numpy import allclose shape = (10, 20) seed = 12345 rng = get_rng() for test_cls in [Normal, Uniform, GlorotNormal, GlorotUniform]: set_rng(RandomState(seed)) sample1 = test_cls().call(shape) set_rng(RandomState(seed)) sample2 = test_cls().call(shape) # reset to original RNG for other tests set_rng(rng) assert allclose(sample1, sample2), \ "random initialization was inconsistent " \ "for {}".format(test_cls.__name__)
def gaussian_perturbation(amps, rng): """ Create gaussian noise tensor with same shape as amplitudes. Parameters ---------- amps: ndarray Amplitudes. rng: RandomState Random generator. Returns ------- perturbation: ndarray Perturbations to add to the amplitudes. """ perturbation = rng.randn(*amps.shape).astype(np.float32) return perturbation
def fake_tetrahedral_ds(): from yt.frontends.stream.api import load_unstructured_mesh from yt.frontends.stream.sample_data.tetrahedral_mesh import \ _connectivity, _coordinates prng = RandomState(0x4d3d3d3) # the distance from the origin node_data = {} dist = np.sum(_coordinates**2, 1) node_data[('connect1', 'test')] = dist[_connectivity] # each element gets a random number elem_data = {} elem_data[('connect1', 'elem')] = prng.rand(_connectivity.shape[0]) ds = load_unstructured_mesh(_connectivity, _coordinates, node_data=node_data, elem_data=elem_data) return ds
def fake_hexahedral_ds(): from yt.frontends.stream.api import load_unstructured_mesh from yt.frontends.stream.sample_data.hexahedral_mesh import \ _connectivity, _coordinates prng = RandomState(0x4d3d3d3) # the distance from the origin node_data = {} dist = np.sum(_coordinates**2, 1) node_data[('connect1', 'test')] = dist[_connectivity-1] # each element gets a random number elem_data = {} elem_data[('connect1', 'elem')] = prng.rand(_connectivity.shape[0]) ds = load_unstructured_mesh(_connectivity-1, _coordinates, node_data=node_data, elem_data=elem_data) return ds
def omap(fun, arglist): print banner("Starting omap") N_tasks = len(arglist) jobname = str(npr.RandomState().randint(10**12)) working_dir = path.join(root_working_dir, jobdir(jobname)) module_path = path.join(os.getcwd(), inspect.getsourcefile(fun)) module_name = inspect.getmodulename(module_path) run_signal_path = path.join('..', run_signal(jobname)) fun_name = fun.__name__ slurm_str = slurm_template.format(jobname=jobname, N_tasks=N_tasks, other_options=slurm_options, module_name=module_name, fun_name=fun_name) with temp_dir(working_dir): shutil.copy(module_path, ".") with open(arg_fname, 'w') as f: pickle.dump(arglist, f) with open(slurm_fname, 'w') as f: f.write(slurm_str) with open(run_signal_path, 'w'): pass print "Submitting {0} tasks (output in {1})".format(N_tasks, working_dir) while path.exists(run_signal_path): time.sleep(1) print "Tasks submitted" return collect_results(jobname)
def test_learn_xor(self): """" Stupid test which gains code coverage """ instance_prng = RandomState(seed=TestLowDegree.seed_instance) instance = LTFArray( weight_array=LTFArray.normal_weights( TestLowDegree.n, TestLowDegree.k, random_instance=instance_prng ), transform=LTFArray.transform_id, combiner=LTFArray.combiner_xor, ) low_degree_learner = LowDegreeAlgorithm( TrainingSet(instance=instance, N=TestLowDegree.N), degree=TestLowDegree.degree ) low_degree_learner.learn()
def test_training_set_challenges(self): """The TrainingSet should generate the same challenges for equal seeds.""" n = 8 k = 1 transformation = LTFArray.transform_id combiner = LTFArray.combiner_xor N = 1000 instance_prng = RandomState(0x4EFEA) weight_array = LTFArray.normal_weights(n, k, random_instance=instance_prng) instance = LTFArray( weight_array=weight_array, transform=transformation, combiner=combiner, ) challenge_seed = 0xAB17D training_set_1 = TrainingSet(instance=instance, N=N, random_instance=RandomState(challenge_seed)) training_set_2 = TrainingSet(instance=instance, N=N, random_instance=RandomState(challenge_seed)) self.assertTrue( array_equal(training_set_1.challenges, training_set_2.challenges), 'The challenges are not equal.', )
def sample_inputs(n, num, random_instance=RandomState()): """ This function generates an iterator for either random samples of {-1,1}-vectors of length `n` if `num` < 2^n, and an iterator for all {-1,1}-vectors of length `n` otherwise. Note that we return only 2^n vectors even with `num` > 2^n. In other words, the output of this function is deterministic if and only if num >= 2^n. :param n: int Length of a n bit vector :param num: int Number of n bit vector :param random_instance: numpy.random.RandomState The PRNG which is used to generate the arrays. :return: iterator of num {-1,1} int arrays An iterator with num random {-1,1} int arrays depending on num and n. """ return random_inputs(n, num, random_instance) if num < 2 ** n else all_inputs(n)
def approx_dist(instance1, instance2, num, random_instance=RandomState()): """ Approximate the distance of two functions instance1, instance2 by evaluating instance1 random set of inputs. instance1, instance2 needs to have eval() method and input_length member. :param instance1: pypuf.simulation.arbiter_based.base.Simulation :param instance2: pypuf.simulation.arbiter_based.base.Simulation :param num: int Number of n bit vector :param random_instance: numpy.random.RandomState The PRNG which is used to generate the input arrays. :return: float Probability (randomly uniform x) for instance1.eval(x) != instance2.eval(x) """ assert instance1.n == instance2.n inputs = array(list(random_inputs(instance1.n, num, random_instance))) return (num - count_nonzero(instance1.eval(inputs) == instance2.eval(inputs))) / num
def approx_stabilities(instance, num, reps, random_instance=RandomState()): """ This function approximates the stability of the given `instance` for `num` challenges evaluating it `reps` times per challenge. The stability is the probability that the instance gives the correct response when evaluated. :param instance: pypuf.simulation.base.Simulation The instance for the stability approximation :param num: int Amount of challenges to be evaluated :param reps: int Amount of repetitions per challenge :return: array of float Array of the stabilities for each challenge """ challenges = sample_inputs(instance.n, num, random_instance) responses = zeros((reps, num)) for i in range(reps): challenges, unpacked_challenges = itertools.tee(challenges) responses[i, :] = instance.eval(array(list(unpacked_challenges))) return 0.5 + 0.5 * np_abs(np_sum(responses, axis=0)) / reps
def calculate_stabilities(self, instance, challenge_prng): """ Calculate the stability for random chosen challenges. :param instance: SimulationMajorityLTFArray A simulation of a Majority Vote Arbiter PUF. :param challenge_prng: RandomState Pseudo-random number generator which is used to generate challenges. """ challenges = np.array(list(tools.random_inputs(self.n, self.N, random_instance=challenge_prng))) eval_array = np.zeros(len(challenges)) # Evaluation of the PUF in order to measure the stability for i in range(self.iterations): eval_array = eval_array + instance.eval(challenges) # Calculation of the stability for every challenge stab_array = (np.abs(eval_array) + self.iterations) / (2 * self.iterations) # Number which counts the satisfying challenges num_goal_fulfilled = 0 # Check of the desired_stability for i in range(self.N): if stab_array[i] >= self.desired_stability: num_goal_fulfilled += 1 # Relative frequency self.overall_stab = num_goal_fulfilled / self.N
def transform_random(challenges, k): """ This input transformation chooses for each Arbiter Chain an random challenge based on the initial challenge. :param challenges: array of int shape(N,n) Array of challenges which should be evaluated by the simulation. :param k: int Number of LTFArray PUFs :return: array of int shape(N,k,n) Array of transformed challenges. """ N = len(challenges) n = len(challenges[0]) vtransform_to_01 = vectorize(tools.transform_challenge_11_to_01) cs_01 = array([vtransform_to_01(c) for c in challenges]) result = array([RandomState(c).choice((-1, 1), (k, n)) for c in cs_01]) assert result.shape == (N, k, n), 'The resulting challenges have not the desired shape.' return result
def __init__(self, n_user, n_item, n_feature, reg=1e-2, converge=1e-5, seed=None, max_rating=None, min_rating=None): super(ALS, self).__init__() self.n_user = n_user self.n_item = n_item self.n_feature = n_feature self.reg = float(reg) self.rand_state = RandomState(seed) self.max_rating = float(max_rating) if max_rating is not None else None self.min_rating = float(min_rating) if min_rating is not None else None self.converge = converge # data state self.mean_rating_ = None self.ratings_csr_ = None self.ratings_csc_ = None # user/item features self.user_features_ = 0.1 * self.rand_state.rand(n_user, n_feature) self.item_features_ = 0.1 * self.rand_state.rand(n_item, n_feature)
def test_light_cone(): prng = RandomState(0x4d3d3d3) ds = data_dir_load(etc) A = 2000. exp_time = 1.0e5 fov = (0.5, "deg") lc = XrayLightCone('%s/32Mpc_32.enzo' % etc[:-14], 'Enzo', 0., 0.1, seed=24) source_model = ThermalSourceModel("apec", 0.1, 10.0, 1000, prng=prng) events = lc.generate_events(A, exp_time, fov, source_model, (30.0, 45.0), absorb_model="wabs", nH=0.02, smooth_positions=0.5, prng=prng) return_events = return_data(events.events) test = GenericArrayTest(ds, return_events, args=["events"]) test_light_cone.__name__ = test.description yield test
def view_voltages(data,title=None, shuffle=False, shuffle_seed=1, vmin=-70, vmax=35, s_per_step=None): """ Show a complete simulation run in an (M*N)xT trace image. Args: data: MxNxT array of voltage traces title: figure title shuffle: If true, shuffle the order of cells in the trace image. s_per_step: seconds per step - if given, display a proper time axis """ plt.figure(figsize=fig_size) vtraces = data.reshape(-1,data.shape[2])[:] if shuffle: rng = RandomState(shuffle_seed) vtraces = rng.permutation(vtraces) if s_per_step is None: T = data.shape[-1] else: T = data.shape[-1] * s_per_step plt.imshow(vtraces, cmap='bone', vmin=vmin, vmax=vmax, aspect='auto', interpolation='nearest', extent=[0, T, vtraces.shape[0], 0]) plt.colorbar() if title: plt.title(title) plt.show()
def diffuse_stimulus(network,inputc,dist,seed=1): """ move each input-receiving node along a random walk of specified distance. Args: network: networkx graph inputc: MxN array, nonzero positions are treated as input-receiving ndes dist: distance Returns: An MxN array of zeros, with the scrambled input population := 1 """ rng = RandomState(seed) inputnodes = [tuple(n) for n in np.transpose(inputc.nonzero())] diffused_nodes = [] for node in inputnodes: d=0 while d<dist: neigh = network.neighbors(node) node_ = neigh[rng.randint(0,len(neigh))] d = d+1 diffused_nodes.append(node_) return ut.idxtoimg(inputc.shape[0],inputc.shape[1],diffused_nodes)
def test_random_state(): import numpy.random as npr # Check with seed state = com._random_state(5) assert_equal(state.uniform(), npr.RandomState(5).uniform()) # Check with random state object state2 = npr.RandomState(10) assert_equal( com._random_state(state2).uniform(), npr.RandomState(10).uniform()) # check with no arg random state assert isinstance(com._random_state(), npr.RandomState) # Error for floats or strings with tm.assertRaises(ValueError): com._random_state('test') with tm.assertRaises(ValueError): com._random_state(5.5)
def test_group_var_generic_1d(self): prng = RandomState(1234) out = (np.nan * np.ones((5, 1))).astype(self.dtype) counts = np.zeros(5, dtype='int64') values = 10 * prng.rand(15, 1).astype(self.dtype) labels = np.tile(np.arange(5), (3, )).astype('int64') expected_out = (np.squeeze(values) .reshape((5, 3), order='F') .std(axis=1, ddof=1) ** 2)[:, np.newaxis] expected_counts = counts + 3 self.algo(out, counts, values, labels) np.testing.assert_allclose(out, expected_out, self.rtol) tm.assert_numpy_array_equal(counts, expected_counts)
def test_group_var_generic_2d_some_nan(self): prng = RandomState(1234) out = (np.nan * np.ones((5, 2))).astype(self.dtype) counts = np.zeros(5, dtype='int64') values = 10 * prng.rand(10, 2).astype(self.dtype) values[:, 1] = np.nan labels = np.tile(np.arange(5), (2, )).astype('int64') expected_out = np.vstack([values[:, 0] .reshape(5, 2, order='F') .std(ddof=1, axis=1) ** 2, np.nan * np.ones(5)]).T expected_counts = counts + 2 self.algo(out, counts, values, labels) np.testing.assert_allclose(out, expected_out, self.rtol) tm.assert_numpy_array_equal(counts, expected_counts)
def randomise_dataframe_rows(self, df): """ Randomise ordering of DataFrame. Return a NumPy array of shuffled index values using `np.random.permutation` Return a new Dataframe containing the shuffled order using `loc[]` `seed(1)` reproduces random same results when share and run same code by others """ if isinstance(df, type(None)): return None # np.random.seed(0) # return df.loc[np.random.permutation(len(df))] prng = RandomState(1234567890) return df.loc[prng.permutation(len(df))] # Alternative Approach: # shuffled_index = np.random.permutation(df.index) # return df.reindex(shuffled_index)
def check_function(self, function, sz): from threading import Thread out1 = np.empty((len(self.seeds),) + sz) out2 = np.empty((len(self.seeds),) + sz) # threaded generation t = [Thread(target=function, args=(np.random.RandomState(s), o)) for s, o in zip(self.seeds, out1)] [x.start() for x in t] [x.join() for x in t] # the same serial for s, o in zip(self.seeds, out2): function(np.random.RandomState(s), o) # these platforms change x87 fpu precision mode in threads if np.intp().dtype.itemsize == 4 and sys.platform == "win32": assert_array_almost_equal(out1, out2) else: assert_array_equal(out1, out2)
def __init__(self, model, random=False, probability=.99, threshold=float('inf'), max_iterations=1000): self._sac_model = model self.probability = probability self.distance_threshold = threshold self.max_iterations = max_iterations self._model = [] self._inliers = [] self._model_coefficients = None if random: self._rng = RandomState() else: self._rng = RandomState(12345)
def __init__(self, cloud, indices=None, random=False): super().__init__(cloud, indices=None) self._radius_limits = (-float('inf'), float('inf')) # use tuple to represent interval # XXX: Whether it should be a private member self.samples_max_dist = 0 # samples_radius_ self._samples_radius_search = None self._error_sqr_dists = [] # The maximum number of samples to try until we get a good one self._max_sample_checks = 1000 # essential fields self._sample_size = 0 self._model_size = 0 if random: self._rng = RandomState() else: self._rng = RandomState(12345)
def findAnchors(Q, K, params, candidates): # row normalize Q row_sums = Q.sum(axis=1) for i in xrange(len(Q[:, 0])): Q[i, :] = Q[i, :]/float(row_sums[i] + 1e-100) # Reduced dimension random projection method for recovering anchor words if params.lowerDim is None or params.lowerDim >= Q.shape[1]: Q_red = Q.copy() else: # Random number generator for generating dimension reduction prng_W = RandomState(params.seed) Q_red = rp.Random_Projection(Q.T, params.lowerDim, prng_W) Q_red = Q_red.T (anchors, anchor_indices) = gs.Projection_Find(Q_red, K, candidates) # restore the original Q for i in xrange(len(Q[:, 0])): Q[i, :] = Q[i, :]*float(row_sums[i]) return anchor_indices
def test_qtl_normal_scan(): random = RandomState(2) N = 200 G = random.randn(N, N + 100) G = stdnorm(G, 0) G /= sqrt(G.shape[1]) p = 2 X = random.randn(N, p) X = stdnorm(X, 0) X /= sqrt(X.shape[1]) u1 = random.randn(N + 100) / sqrt(N + 100) u2 = random.randn(p) / sqrt(p) y = dot(G, u1) + dot(X, u2) qtl = scan(NormalPhenotype(y), X, G=G, progress=False, fast=False) assert_allclose( qtl.pvalues(), [ 4.742418e-001, 5.094706e-167 ], rtol=1e-5)
def test_qtl_normal_scan_covariate_redundance(): random = RandomState(2) N = 50 G = random.randn(N, N + 100) G = stdnorm(G, 0) G /= sqrt(G.shape[1]) p = 5 X = random.randn(N, p) X = stdnorm(X, 0) X /= sqrt(X.shape[1]) u1 = random.randn(N + 100) / sqrt(N + 100) u2 = random.randn(p) / sqrt(p) y = dot(G, u1) + dot(X, u2) X[:] = 1 qtl = scan(NormalPhenotype(y), X, G=G, progress=False, fast=False) assert_allclose(qtl.pvalues(), [1] * p)
def test_qtl_poisson_scan(): random = RandomState(9) N = 200 G = random.randn(N, N + 100) G = stdnorm(G, 0) G /= sqrt(G.shape[1]) p = 2 X = random.randn(N, p) X = stdnorm(X, 0) X /= sqrt(X.shape[1]) noccurrences = poisson( -0.1, G, causal_variants=X, causal_variance=0.1, random_state=random) qtl = scan(PoissonPhenotype(noccurrences), X, G=G, progress=False, fast=False) assert_allclose( qtl.pvalues(), [ 0.8163571597, 0.0849437877 ], rtol=1e-2)
def test_qtl_bernoulli_scan(): random = RandomState(9) N = 500 G = random.randn(N, N + 100) G = stdnorm(G, 0) G /= sqrt(G.shape[1]) p = 2 X = random.randn(N, p) X = stdnorm(X, 0) X /= sqrt(X.shape[1]) outcome = bernoulli( -0.1, G, causal_variants=X, causal_variance=0.1, random_state=random) qtl = scan(BernoulliPhenotype(outcome), X, G=G, progress=False, fast=False) assert_allclose( qtl.pvalues(), [ 0.27762911, 0.11432954 ], rtol=1e-4)
def test_stdnorm(): random = RandomState(38943) x = random.randn(10) X = random.randn(10, 5) x = stdnorm(x) X = stdnorm(X, 0) assert_allclose(x.mean(0), [0], atol=1e-7) assert_allclose(x.std(0), 1, atol=1e-7) assert_allclose(X.mean(0), [0] * 5, atol=1e-7) assert_allclose(X.std(0), [1] * 5, atol=1e-7) x = ones(10) X = random.randn(10, 5) X[:, 0] = 1 assert_allclose(stdnorm(x).mean(0), [0]) assert_allclose(stdnorm(x).std(0), [0])
def __init__(self, num_examples, means=None, variances=None, priors=None, **kwargs): rng = kwargs.pop('rng', None) if rng is None: seed = kwargs.pop('seed', 0) rng = np.random.RandomState(seed) gaussian_mixture = GMM_distribution(means=means, variances=variances, priors=priors, rng=rng) self.means = gaussian_mixture.means self.variances = gaussian_mixture.variances self.priors = gaussian_mixture.priors features, labels = gaussian_mixture.sample(nsamples=num_examples) densities = gaussian_mixture.pdf(x=features) data ={'samples': features, 'label': labels, 'density': densities} self.data = data
def _iter_fast(self, ds, batch_size, start=None, end=None, shuffle=True, seed=None): # craete random seed prng1 = None prng2 = _dummy_shuffle if shuffle: if seed is None: seed = get_random_magic_seed() prng1 = RandomState(seed) prng2 = RandomState(seed) batches = create_batch(ds.shape[0], batch_size, start, end, prng1) prng2.shuffle(batches) for i, j in batches: data = ds[i:j] yield self._normalizer(data[prng2.permutation(data.shape[0])])
def state(self): """ Make a new RandomState from our seed. This ensures that every call to _*_values produces the same output every time for a given SeededRandomLoader instance. """ return RandomState(self._seed)
def test_scalar(self): s = np.random.RandomState(0) assert_equal(s.randint(1000), 684) s = np.random.RandomState(4294967295) assert_equal(s.randint(1000), 419)
def test_array(self): s = np.random.RandomState(range(10)) assert_equal(s.randint(1000), 468) s = np.random.RandomState(np.arange(10)) assert_equal(s.randint(1000), 468) s = np.random.RandomState([0]) assert_equal(s.randint(1000), 973) s = np.random.RandomState([4294967295]) assert_equal(s.randint(1000), 265)
def test_invalid_scalar(self): # seed must be an unsigned 32 bit integer assert_raises(TypeError, np.random.RandomState, -0.5) assert_raises(ValueError, np.random.RandomState, -1)
def test_invalid_array(self): # seed must be an unsigned 32 bit integer assert_raises(TypeError, np.random.RandomState, [-0.5]) assert_raises(ValueError, np.random.RandomState, [-1]) assert_raises(ValueError, np.random.RandomState, [4294967296]) assert_raises(ValueError, np.random.RandomState, [1, 2, 4294967296]) assert_raises(ValueError, np.random.RandomState, [1, -2, 4294967296])
def setUp(self): self.seed = 1234567890 self.prng = random.RandomState(self.seed) self.state = self.prng.get_state()
def setRandomCenters(self, dim, weight, seed): """ Set the initial centres to be random samples from a gaussian population with constant weights. """ rng = random.RandomState(seed) clusterCenters = rng.randn(self._k, dim) clusterWeights = tile(weight, self._k) self._model = StreamingKMeansModel(clusterCenters, clusterWeights) return self
def set_seed(n): global seed, py_rng, np_rng, t_rng seed = n py_rng = Random(seed) np_rng = RandomState(seed) t_rng = RandomStreams(seed)
def set_seed(n): global seed, py_rng, np_rng, t_rng print('set seed = %d' % n) seed = n py_rng = Random(seed) np_rng = RandomState(seed) t_rng = RandomStreams(seed)
def __init__(self, batch_size): self.batch_size = batch_size self.rng = RandomState(328774)
def reset_rng(self): self.rng = RandomState(328774)
def __init__(self, batch_size, input_time_length, n_preds_per_input, seed=(2017, 6, 28)): self.batch_size = batch_size self.input_time_length = input_time_length self.n_preds_per_input = n_preds_per_input self.seed = seed self.rng = RandomState(self.seed)
def reset_rng(self): self.rng = RandomState(self.seed)
def test_MACD_window_length_generation(self, seed): rng = RandomState(seed) signal_period = rng.randint(1, 90) fast_period = rng.randint(signal_period + 1, signal_period + 100) slow_period = rng.randint(fast_period + 1, fast_period + 100) ewma = MovingAverageConvergenceDivergenceSignal( fast_period=fast_period, slow_period=slow_period, signal_period=signal_period, ) assert_equal( ewma.window_length, slow_period + signal_period - 1, )
def walk_from_these_directions(self, scale_size: int, prng: RandomState): new_indices = directions_to_walk(self.indices, scale_size, prng) return Pattern(self.name, self.bars, new_indices, self.velocity, self.duration, self.repeatable, self.real_time) # Returns the pattern using sounds, by walking a scale with the scale # walker.
def restrict_pattern_set(patterns, amount: float, prng: RandomState): if not 0.0 < amount <= 1.0: return [] count = int(ceil(len(patterns) * amount)) if count == len(patterns): return patterns return prng.choice(patterns, replace=False, size=count)