我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用random.uniform()。
def __call__(self, img): for attempt in range(10): area = img.size[0] * img.size[1] target_area = random.uniform(0.9, 1.) * area aspect_ratio = random.uniform(7. / 8, 8. / 7) w = int(round(math.sqrt(target_area * aspect_ratio))) h = int(round(math.sqrt(target_area / aspect_ratio))) if random.random() < 0.5: w, h = h, w if w <= img.size[0] and h <= img.size[1]: x1 = random.randint(0, img.size[0] - w) y1 = random.randint(0, img.size[1] - h) img = img.crop((x1, y1, x1 + w, y1 + h)) assert (img.size == (w, h)) return img.resize((self.size, self.size), self.interpolation) # Fallback scale = Scale(self.size, interpolation=self.interpolation) crop = CenterCrop(self.size) return crop(scale(img))
def test_get_many(): source = IntFloatDataSource() values = [random.randint(-VALUES_MAX, VALUES_MAX) for _ in range(VALUES_COUNT)] for value in values: query = {VALUE_KEY: value, COUNT_KEY: VALUES_COUNT} result = source.get_many(int, query) assert type(result) is GENERATOR_CLASS for res in result: assert type(res) is int assert res == value values = [random.uniform(-VALUES_MAX, VALUES_MAX) for _ in range(VALUES_COUNT)] for value in values: query = {VALUE_KEY: value, COUNT_KEY: VALUES_COUNT} result = source.get_many(float, query) assert type(result) is GENERATOR_CLASS for res in result: assert type(res) is float assert res == value
def serviceA(context=None): #reuse context if it exists, otherwise make a new one context = context or zmq.Context.instance() service = context.socket(zmq.DEALER) #identify worker service.setsockopt(zmq.IDENTITY,b'A') service.connect("tcp://localhost:5560") while True: message = service.recv() with myLock: print "Service A got:" print message if message == "Service A": #do some work time.sleep(random.uniform(0,0.5)) service.send(b"Service A did your laundry") elif message == "END": break else: with myLock: print "the server has the wrong identities!" break
def client_proc(computation, njobs, task=None): # schedule computation with the scheduler; scheduler accepts one computation # at a time, so if scheduler is shared, the computation is queued until it # is done with already scheduled computations if (yield computation.schedule()): raise Exception('Could not schedule computation') # arguments must correspond to arguments for computaiton; multiple arguments # (as in this case) can be given as tuples args = [(i, random.uniform(2, 5)) for i in range(njobs)] results = yield computation.run_results(compute, args) # Tasks may not be executed in the order of given list of args, but # results would be in the same order of given list of args for result in results: print(' result for %d from %s: %s' % result) # wait for all jobs to be done and close computation yield computation.close()
def client_proc(computation, task=None): # schedule computation with the scheduler if (yield computation.schedule()): raise Exception('schedule failed') i = 0 while True: cmd = yield task.receive() if cmd is None: break i += 1 c = C(i) c.n = random.uniform(20, 50) # unlike in dispycos_client*.py, here 'run_async' is used to run as # many tasks as given on servers (i.e., possibly more than one # task on a server at any time). rtask = yield computation.run_async(compute, c, task) if isinstance(rtask, pycos.Task): print(' %s: rtask %s created' % (i, rtask)) else: print(' %s: rtask failed: %s' % (i, rtask)) # unlike in dispycos_httpd1.py, here 'await_async' is not used, so any # running async tasks are just terminated. yield computation.close()
def client_proc(computation, task=None): if (yield computation.schedule()): raise Exception('Could not schedule computation') # execute 10 jobs (tasks) and get their results. Note that number of jobs # created can be more than number of server processes available; the # scheduler will use as many processes as necessary/available, running one # job at a server process algorithms = ['md5', 'sha1', 'sha224', 'sha256', 'sha384', 'sha512'] args = [(algorithms[i % len(algorithms)], random.uniform(5, 10)) for i in range(15)] results = yield computation.run_results(compute, args) for i, result in enumerate(results): if isinstance(result, tuple) and len(result) == 3: print(' %ssum for %s: %s' % (result[1], result[0], result[2])) else: print(' rtask failed for %s: %s' % (args[i][0], str(result))) yield computation.close()
def client_proc(computation, task=None): if (yield computation.schedule()): raise Exception('Could not schedule computation') # execute 10 jobs (tasks) and get their results. Note that number of jobs # created can be more than number of server processes available; the # scheduler will use as many processes as necessary/available, running one # job at a server process yield task.sleep(2) algorithms = ['md5', 'sha1', 'sha224', 'sha256', 'sha384', 'sha512'] args = [(algorithms[i % len(algorithms)], random.uniform(1, 3)) for i in range(15)] results = yield computation.run_results(compute, args) for i, result in enumerate(results): if isinstance(result, tuple) and len(result) == 3: print(' %ssum for %s: %s' % (result[1], result[0], result[2])) else: print(' rtask failed for %s: %s' % (args[i][0], str(result))) yield computation.close()
def client_proc(computation, njobs, task=None): # schedule computation with the scheduler; scheduler accepts one computation # at a time, so if scheduler is shared, the computation is queued until it # is done with already scheduled computations if (yield computation.schedule()): raise Exception('Could not schedule computation') # run jobs for i in range(njobs): # computation is supposed to be CPU bound so 'run' is used so at most # one computations runs at a server at any time; for mostly idle # computations, use 'run_async' to run more than one computation at a # server at the same time. rtask = yield computation.run(compute, random.uniform(5, 10)) if isinstance(rtask, pycos.Task): print(' job %s processed by %s' % (i, rtask.location)) else: print('rtask %s failed: %s' % (i, rtask)) # wait for all jobs to be done and close computation yield computation.close()
def client_proc(job_id, data_file, rtask, task=None): # send input file to rtask.location; this will be saved to dispycos process's # working directory if (yield pycos.Pycos().send_file(rtask.location, data_file, timeout=10)) < 0: print('Could not send input data to %s' % rtask.location) # terminate remote task rtask.send(None) raise StopIteration(-1) # send info about input obj = C(job_id, data_file, random.uniform(5, 8), task) if (yield rtask.deliver(obj)) != 1: print('Could not send input to %s' % rtask.location) raise StopIteration(-1) # rtask sends result to this task as message result = yield task.receive() if not result.result_file: print('Processing %s failed' % obj.i) raise StopIteration(-1) # rtask saves results file at this client, which is saved in pycos's # dest_path, not current working directory! result_file = os.path.join(pycos.Pycos().dest_path, result.result_file) # move file to cwd target = os.path.join(os.getcwd(), os.path.basename(result_file)) os.rename(result_file, target) print(' job %s output is in %s' % (obj.i, target))
def rti_test(task=None): # if server is on remote network, automatic discovery won't work, # so add it explicitly # yield scheduler.peer('192.168.21.5') # get reference to RTI at server rti1 = yield pycos.RTI.locate('rti_1') print('RTI is at %s' % rti1.location) # 5 (remote) tasks are created with rti1 n = 5 # set monitor (monitor_proc task) for tasks created for this RTI yield rti1.monitor(pycos.Task(monitor_proc, n)) for i in range(n): rtask = yield rti1('test%s' % i, b=i) pycos.logger.debug('RTI %s created' % rtask) # If necessary, each rtask can also be set (different) 'monitor' rtask.send('msg:%s' % i) yield task.sleep(random.uniform(0, 1))
def client_proc(computation, njobs, task=None): # schedule computation with the scheduler; scheduler accepts one computation # at a time, so if scheduler is shared, the computation is queued until it # is done with already scheduled computations if (yield computation.schedule()): raise Exception('Could not schedule computation') # pair EC2 node with this client with: yield pycos.Pycos().peer(pycos.Location('54.204.242.185', 51347)) # if multiple nodes are used, 'broadcast' option can be used to pair with # all nodes with just one statement as: # yield pycos.Pycos().peer(pycos.Location('54.204.242.185', 51347), broadcast=True) # execute n jobs (tasks) and get their results. Note that number of # jobs created can be more than number of server processes available; the # scheduler will use as many processes as necessary/available, running one # job at a server process args = [random.uniform(3, 10) for _ in range(njobs)] results = yield computation.run_results(compute, args) for result in results: print('result: %s' % result) yield computation.close()
def client_proc(task=None): # create channel channel = pycos.Channel('sum_prod') # create tasks to compute sum and product of numbers sent sum_task = pycos.Task(seqsum) prod_task = pycos.Task(seqprod) # subscribe tasks to channel so they receive messages yield channel.subscribe(sum_task) yield channel.subscribe(prod_task) # send 4 numbers to channel for _ in range(4): r = random.uniform(0.5, 3) channel.send(r) print('sent %f' % r) # send None to indicate end of data channel.send(None) yield channel.unsubscribe(sum_task) yield channel.unsubscribe(prod_task)
def generate_example(seq_length, min_val, max_val): """ Creates a list of (a,b) tuples where a is random[min_val,max_val] and b is 1 in only two tuples, 0 for the rest. The ground truth is the addition of a values for tuples with b=1. :param seq_length: length of the sequence to be generated :param min_val: minimum value for a :param max_val: maximum value for a :return x: list of (a,b) tuples :return y: ground truth """ # Select b values: one in first X% of the sequence, the other in the second Y% b1 = random.randint(0, int(seq_length * FIRST_MARKER / 100.) - 1) b2 = random.randint(int(seq_length * SECOND_MARKER / 100.), seq_length - 1) b = [0.] * seq_length b[b1] = 1. b[b2] = 1. # Generate list of tuples x = [(random.uniform(min_val, max_val), marker) for marker in b] y = x[b1][0] + x[b2][0] return x, y
def createMetaball(origin=(0, 0, 0), n=30, r0=4, r1=2.5): metaball = bpy.data.metaballs.new('MetaBall') obj = bpy.data.objects.new('MetaBallObject', metaball) bpy.context.scene.objects.link(obj) metaball.resolution = 0.2 metaball.render_resolution = 0.05 for i in range(n): location = Vector(origin) + Vector(random.uniform(-r0, r0) for i in range(3)) element = metaball.elements.new() element.co = location element.radius = r1 return metaball
def _send_http_post(self, pause=10): global stop_now self.socks.send("POST / HTTP/1.1\r\n" "Host: %s\r\n" "User-Agent: %s\r\n" "Connection: keep-alive\r\n" "Keep-Alive: 900\r\n" "Content-Length: 10000\r\n" "Content-Type: application/x-www-form-urlencoded\r\n\r\n" % (self.host, random.choice(useragents))) for i in range(0, 9999): if stop_now: self.running = False break p = random.choice(string.letters+string.digits) print term.BOL+term.UP+term.CLEAR_EOL+"Posting: %s" % p+term.NORMAL self.socks.send(p) time.sleep(random.uniform(0.1, 3)) self.socks.close()
def __init__(self, auth_provider, device_info=None): self.log = logging.getLogger(__name__) self._auth_provider = auth_provider # mystical unknown6 - resolved by PokemonGoDev self._signal_agglom_gen = False self._signature_lib = None if RpcApi.START_TIME == 0: RpcApi.START_TIME = get_time(ms=True) if RpcApi.RPC_ID == 0: RpcApi.RPC_ID = int(random.random() * 10 ** 18) self.log.debug('Generated new random RPC Request id: %s', RpcApi.RPC_ID) # data fields for unknown6 self.session_hash = os.urandom(32) self.token2 = random.randint(1,59) self.course = random.uniform(0, 360) self.device_info = device_info
def genetic_algorithm(population, fitness_fn, ngen=1000, pmut=0.0): """[Fig. 4.7]""" def reproduce(p1, p2): c = random.randrange(len(p1)) return p1[:c] + p2[c:] for i in range(ngen): new_population = [] for i in len(population): p1, p2 = random_weighted_selections(population, 2, fitness_fn) child = reproduce(p1, p2) if random.uniform(0,1) > pmut: child.mutate() new_population.append(child) population = new_population return argmax(population, fitness_fn)
def random_weighted_selection(seq, n, weight_fn): """Pick n elements of seq, weighted according to weight_fn. That is, apply weight_fn to each element of seq, add up the total. Then choose an element e with probability weight[e]/total. Repeat n times, with replacement. """ totals = []; runningtotal = 0 for item in seq: runningtotal += weight_fn(item) totals.append(runningtotal) selections = [] for s in range(n): r = random.uniform(0, totals[-1]) for i in range(len(seq)): if totals[i] > r: selections.append(seq[i]) break return selections #_____________________________________________________________________________ # The remainder of this file implements examples for the search algorithms. #______________________________________________________________________________ # Graphs and Graph Problems
def RandomGraph(nodes=range(10), min_links=2, width=400, height=300, curvature=lambda: random.uniform(1.1, 1.5)): """Construct a random graph, with the specified nodes, and random links. The nodes are laid out randomly on a (width x height) rectangle. Then each node is connected to the min_links nearest neighbors. Because inverse links are added, some nodes will have more connections. The distance between nodes is the hypotenuse times curvature(), where curvature() defaults to a random number between 1.1 and 1.5.""" g = UndirectedGraph() g.locations = {} ## Build the cities for node in nodes: g.locations[node] = (random.randrange(width), random.randrange(height)) ## Build roads from each city to at least min_links nearest neighbors. for i in range(min_links): for node in nodes: if len(g.get(node)) < min_links: here = g.locations[node] def distance_to_node(n): if n is node or g.get(node,n): return infinity return distance(g.locations[n], here) neighbor = argmin(nodes, distance_to_node) d = distance(g.locations[neighbor], here) * curvature() g.connect(node, neighbor, int(d)) return g
def rate(self, message_object, user): ''' # totally not rigged or something def isDevMentioned(): for u in message_object.mentions: if u.name == "Theraga" or u.name == "Dynista": return True return False if user == "theraga" or user == "Theraga" or user == "dynista" or user == "Dynista" or isDevMentioned(): await self.pm.client.send_message(message_object.channel, "I would rate **" + user + "** 100.00/100") else: ''' number = round(random.uniform(1, 100), 2) print(message_object.mentions) await self.pm.client.send_message(message_object.channel, "I would rate " + "**" + user + "** " + str(number) + "/100")
def __call__(self, img): for attempt in range(10): area = img.size[0] * img.size[1] target_area = random.uniform(0.70, 0.98) * area aspect_ratio = random.uniform(5. / 8, 8. / 5) w = int(round(math.sqrt(target_area * aspect_ratio))) h = int(round(math.sqrt(target_area / aspect_ratio))) if random.random() < 0.5: w, h = h, w if w <= img.size[0] and h <= img.size[1]: x1 = random.randint(0, img.size[0] - w) y1 = random.randint(0, img.size[1] - h) img = img.crop((x1, y1, x1 + w, y1 + h)) assert (img.size == (w, h)) return img.resize((self.size, self.size), self.interpolation) # Fallback scale = Scale(self.size, interpolation=self.interpolation) crop = CenterCrop(self.size) return crop(scale(img))
def test_put(): sink = IntFloatDataSink() values = [random.randint(-VALUES_MAX, VALUES_MAX) for _ in range(VALUES_COUNT)] for value in values: result = sink.put(int, value) assert result is None assert value in sink.items[int] values = [random.uniform(-VALUES_MAX, VALUES_MAX) for _ in range(VALUES_COUNT)] for value in values: result = sink.put(float, value) assert result is None assert value in sink.items[float]
def test_wildcard_put(): sink = SimpleWildcardDataSink() values = [random.randint(-VALUES_MAX, VALUES_MAX) for _ in range(VALUES_COUNT)] for value in values: result = sink.put(int, value) assert result is None assert value in sink.items[int] values = [random.uniform(-VALUES_MAX, VALUES_MAX) for _ in range(VALUES_COUNT)] for value in values: result = sink.put(float, value) assert result is None assert value in sink.items[float] ##################### # Put Many Function # #####################
def test_put_many(): sink = IntFloatDataSink() values = [random.randint(-VALUES_MAX, VALUES_MAX) for _ in range(VALUES_COUNT)] for value in values: items = (value for _ in range(VALUES_COUNT)) result = sink.put_many(int, items) assert result is None assert value in sink.items[int] values = [random.uniform(-VALUES_MAX, VALUES_MAX) for _ in range(VALUES_COUNT)] for value in values: items = (value for _ in range(VALUES_COUNT)) result = sink.put_many(float, items) assert result is None assert value in sink.items[float]
def test_wildcard_put_many(): sink = SimpleWildcardDataSink() values = [random.randint(-VALUES_MAX, VALUES_MAX) for _ in range(VALUES_COUNT)] for value in values: items = (value for _ in range(VALUES_COUNT)) result = sink.put_many(int, items) assert result is None assert value in sink.items[int] values = [random.uniform(-VALUES_MAX, VALUES_MAX) for _ in range(VALUES_COUNT)] for value in values: items = (value for _ in range(VALUES_COUNT)) result = sink.put_many(float, items) assert result is None assert value in sink.items[float] ##################### # CompositeDataSink # #####################
def test_get(): source = IntFloatDataSource() values = [random.randint(-VALUES_MAX, VALUES_MAX) for _ in range(VALUES_COUNT)] for value in values: query = {VALUE_KEY: value} result = source.get(int, query) assert type(result) is int assert result == value values = [random.uniform(-VALUES_MAX, VALUES_MAX) for _ in range(VALUES_COUNT)] for value in values: query = {VALUE_KEY: value} result = source.get(float, query) assert type(result) is float assert result == value
def test_wildcard_get(): source = SimpleWildcardDataSource() values = [random.randint(-VALUES_MAX, VALUES_MAX) for _ in range(VALUES_COUNT)] for value in values: query = {VALUE_KEY: value} result = source.get(int, query) assert type(result) is int assert result == value values = [random.uniform(-VALUES_MAX, VALUES_MAX) for _ in range(VALUES_COUNT)] for value in values: query = {VALUE_KEY: value} result = source.get(float, query) assert type(result) is float assert result == value ##################### # Get Many Function # #####################
def mutation(cls, indiv): newInd = indiv.duplicate() nbMute = int(random.random()*(len(newInd.__x)-1))+1 toMute = [] for i, x in enumerate(indiv.__x): if len(toMute) < nbMute: toMute.append(i) else: iMin = 0 for im, i2 in enumerate(toMute): if abs(indiv.__x[i2]) < abs(indiv.__x[toMute[iMin]]): iMin = im if abs(x) > abs(indiv.__x[toMute[iMin]]): toMute[iMin] = i for i in toMute: newInd.__x[i] = random.uniform(cls.__BOUNDS[0], cls.__BOUNDS[1]) return newInd
def __generateRandomVarValue(cls, iVar): # 1- Get the definition domain of the variable defDomain = cls.VARIABLES_RANGES[iVar] if defDomain is None: randFloat = random.uniform(-sys.maxint-1, sys.maxint) else: # 2- Check the open/closed bounds includeFirst = defDomain[0] == '[' includeLast = defDomain[-1] == ']' # 3- Get a random number in the domain defDomain = eval('[' + defDomain[1:-1] + ']') randFloat = random.random()*(defDomain[1]-defDomain[0]) + defDomain[0] # 4- Check the bounds while (randFloat == defDomain[0] and not includeFirst) or\ (randFloat == defDomain[1] and not includeLast): randFloat = random.random()*(defDomain[1]-defDomain[0]) + defDomain[0] # 5- Cast the variable type return cls.VARIABLES_TYPE(randFloat) # ----------------------
def recaptcha(self): """ Check if we need to solve a captcha. This will open in the default browser. If we choose to not solve the captcha should it be required, we will then be considered a 'lurker' and we will not be able to chat and our name will be shown as a guest name in the room. """ t = str(random.uniform(0.9, 0.10)) _url = 'https://tinychat.com/cauth/captcha?{0}'.format(t) _response = util.web.http_get(url=_url, json=True, proxy=self.proxy) log.debug('recaptcha response: %s' % _response) if _response['json'] is not None: if _response['json']['need_to_solve_captcha'] == 1: link = 'https://tinychat.com/cauth/recaptcha?token={0}'.format(_response['json']['token']) webbrowser.open(link, new=True) print (link) raw_input('Solve the captcha and click enter to continue.')
def mutation(self, mutation_type): if mutation_type not in MUTATION_TYPES: raise GenomeError("mutation type not supported") threshold = random.uniform(self.mutation_lower_threshold, self.mutation_higher_threshold) if threshold < self.mutation_threshold: return if mutation_type == "add_node": self.mutate_add_node() elif mutation_type == "remove_node": self.mutate_remove_node() elif mutation_type == "add_connection": self.mutate_add_connection() elif mutation_type == "remove_connection": self.mutate_remove_connection() else: raise GenomeError("something wrong happened in mutation.")
def serviceB(context=None): #reuse context if it exists, otherwise make a new one context = context or zmq.Context.instance() service = context.socket(zmq.DEALER) #identify worker service.setsockopt(zmq.IDENTITY,b'B') service.connect("tcp://localhost:5560") while True: message = service.recv() with myLock: print "Service B got:" print message if message == "Service B": #do some work time.sleep(random.uniform(0,0.5)) service.send(b"Service B cleaned your room") elif message == "END": break else: with myLock: print "the server has the wrong identities!" break
def query(self, images): if self.pool_size == 0: return images return_images = [] for image in images.data: image = torch.unsqueeze(image, 0) if self.num_imgs < self.pool_size: self.num_imgs = self.num_imgs + 1 self.images.append(image) return_images.append(image) else: p = random.uniform(0, 1) if p > 0.5: random_id = random.randint(0, self.pool_size-1) tmp = self.images[random_id].clone() self.images[random_id] = image return_images.append(tmp) else: return_images.append(image) return_images = Variable(torch.cat(return_images, 0)) return return_images