我们从Python开源项目中,提取了以下45个代码示例,用于说明如何使用random.getrandbits()。
def test_uint_multi_port(self, x_series_device, seed): # Reset the pseudorandom number generator with seed. random.seed(seed) do_ports = random.sample( [d for d in x_series_device.do_ports if d.do_port_width <= 16], 2) total_port_width = sum([d.do_port_width for d in do_ports]) with nidaqmx.Task() as task: task.do_channels.add_do_chan( flatten_channel_string([d.name for d in do_ports]), line_grouping=LineGrouping.CHAN_FOR_ALL_LINES) # Generate random values to test. values_to_test = [int(random.getrandbits(total_port_width)) for _ in range(10)] values_read = [] for value_to_test in values_to_test: task.write(value_to_test) time.sleep(0.001) values_read.append(task.read()) assert values_read == values_to_test
def test_one_sample_one_line(self, x_series_device, seed): # Reset the pseudorandom number generator with seed. random.seed(seed) do_line = random.choice(x_series_device.do_lines).name with nidaqmx.Task() as task: task.do_channels.add_do_chan( do_line, line_grouping=LineGrouping.CHAN_PER_LINE) writer = DigitalSingleChannelWriter(task.out_stream) reader = DigitalSingleChannelReader(task.in_stream) # Generate random values to test. values_to_test = [bool(random.getrandbits(1)) for _ in range(10)] values_read = [] for value_to_test in values_to_test: writer.write_one_sample_one_line(value_to_test) time.sleep(0.001) values_read.append(reader.read_one_sample_one_line()) numpy.testing.assert_array_equal(values_read, values_to_test)
def test_one_sample_port_byte(self, x_series_device, seed): # Reset the pseudorandom number generator with seed. random.seed(seed) do_port = random.choice( [d for d in x_series_device.do_ports if d.do_port_width <= 8]) with nidaqmx.Task() as task: task.do_channels.add_do_chan( do_port.name, line_grouping=LineGrouping.CHAN_FOR_ALL_LINES) # Generate random values to test. values_to_test = [int(random.getrandbits(do_port.do_port_width)) for _ in range(10)] writer = DigitalSingleChannelWriter(task.out_stream) reader = DigitalSingleChannelReader(task.in_stream) values_read = [] for value_to_test in values_to_test: writer.write_one_sample_port_byte(value_to_test) time.sleep(0.001) values_read.append(reader.read_one_sample_port_byte()) numpy.testing.assert_array_equal(values_read, values_to_test)
def test_one_sample_port_uint16(self, x_series_device, seed): # Reset the pseudorandom number generator with seed. random.seed(seed) do_port = random.choice( [do for do in x_series_device.do_ports if do.do_port_width <= 16]) with nidaqmx.Task() as task: task.do_channels.add_do_chan( do_port.name, line_grouping=LineGrouping.CHAN_FOR_ALL_LINES) # Generate random values to test. values_to_test = [int(random.getrandbits(do_port.do_port_width)) for _ in range(10)] writer = DigitalSingleChannelWriter(task.out_stream) reader = DigitalSingleChannelReader(task.in_stream) values_read = [] for value_to_test in values_to_test: writer.write_one_sample_port_uint16(value_to_test) time.sleep(0.001) values_read.append(reader.read_one_sample_port_uint16()) numpy.testing.assert_array_equal(values_read, values_to_test)
def test_one_sample_port_uint32(self, x_series_device, seed): # Reset the pseudorandom number generator with seed. random.seed(seed) do_port = random.choice( [do for do in x_series_device.do_ports if do.do_port_width <= 32]) with nidaqmx.Task() as task: task.do_channels.add_do_chan( do_port.name, line_grouping=LineGrouping.CHAN_FOR_ALL_LINES) # Generate random values to test. values_to_test = [int(random.getrandbits(do_port.do_port_width)) for _ in range(10)] writer = DigitalSingleChannelWriter(task.out_stream) reader = DigitalSingleChannelReader(task.in_stream) values_read = [] for value_to_test in values_to_test: writer.write_one_sample_port_uint32(value_to_test) time.sleep(0.001) values_read.append(reader.read_one_sample_port_uint32()) numpy.testing.assert_array_equal(values_read, values_to_test)
def random_context(): result = {} if random.getrandbits(1): result["function_name"] = RunLambdaCliTest.random_string() if random.getrandbits(1): result["function_version"] = RunLambdaCliTest.random_string() if random.getrandbits(1): result["invoked_function_arn"] = RunLambdaCliTest.random_string() if random.getrandbits(1): result["memory_limit_in_mb"] = str(random.randint(100, 200)) if random.getrandbits(1): result["aws_request_id"] = RunLambdaCliTest.random_string() if random.getrandbits(1): result["log_group_name"] = RunLambdaCliTest.random_string() if random.getrandbits(1): result["log_stream_name"] = RunLambdaCliTest.random_string() if random.getrandbits(1): result["identity"] = RunLambdaCliTest.random_identity() if random.getrandbits(1): result["client_context"] = RunLambdaCliTest.random_client_context() return result
def __init__(self, *args, **kwds): ## Create shared memory for rendered image #pg.dbg(namespace={'r': self}) if sys.platform.startswith('win'): self.shmtag = "pyqtgraph_shmem_" + ''.join([chr((random.getrandbits(20)%25) + 97) for i in range(20)]) self.shm = mmap.mmap(-1, mmap.PAGESIZE, self.shmtag) # use anonymous mmap on windows else: self.shmFile = tempfile.NamedTemporaryFile(prefix='pyqtgraph_shmem_') self.shmFile.write(b'\x00' * (mmap.PAGESIZE+1)) fd = self.shmFile.fileno() self.shm = mmap.mmap(fd, mmap.PAGESIZE, mmap.MAP_SHARED, mmap.PROT_WRITE) atexit.register(self.close) GraphicsView.__init__(self, *args, **kwds) self.scene().changed.connect(self.update) self.img = None self.renderTimer = QtCore.QTimer() self.renderTimer.timeout.connect(self.renderView) self.renderTimer.start(16)
def main(): port = "5556" context = zmq.Context() socket = context.socket(zmq.PAIR) socket.connect("tcp://localhost:%s" % port) socket.send_string(str('hello')) message = '00101110' cnt = 0 while True: reward = socket.recv() # 1 or 0, or '-1' for None print(reward) msg_in = socket.recv() print(msg_in) # think... msg_out = str(random.getrandbits(1) if cnt % 7 == 0 else 1) if cnt % 2 == 0: msg_out = str(message[cnt % 8]) socket.send(msg_out) cnt = cnt + 1
def gen_pseudo_word(self, L=None): if not L: L = random.randint(1, 8) # generate one word that we hadn't used before while True: if self.readable: # alternating between vowels and consonants, sampled with repl. _choice, _range = random.choice, range(int(math.ceil(L / 2))) v = [_choice(self.V) for i in _range] c = [_choice(self.C) for i in _range] zipped = zip(v, c) if random.getrandbits(1) else zip(c, v) pseudo_word = ''.join([a for b in zipped for a in b])[:L] else: pseudo_word = ''.join(random.sample( string.ascii_lowercase, L)) if pseudo_word not in self.inv_word_mapping: return pseudo_word
def dated_path(obj, file_data): try: prefix = getattr(obj, 'model_name') except BaseException: prefix = "undefined" parts = op.splitext(file_data.filename) rand = random.getrandbits(16) filename = u"{name}_{rand}{ext}".format( rand=rand, name=parts[0], ext=parts[1] ) filename = secure_filename(filename) today = date.today() path = u"{prefix}/{t.year}/{t.month}/{filename}".format( prefix=prefix, t=today, filename=filename ) return path
def action_clone_item(self, ids): if len(ids) > 1: flash( "You can select only one item for this action", 'error' ) return model = current_app.db.get_with_content(_id=ids[0]) clone = deepcopy(model) del clone['_id'] clone['slug'] = f'{clone["slug"]}-{random.getrandbits(32)}' clone['_isclone'] = True self._on_model_change(None, clone, True) self.coll.insert(clone) self.after_model_change(None, clone, True) return redirect(url_for('.edit_view', id=clone['_id'])) # TODO: Serialize and activate thia action
def web2py_uuid(ctokens=UNPACKED_CTOKENS): """ This function follows from the following discussion: `http://groups.google.com/group/web2py-developers/browse_thread/thread/7fd5789a7da3f09` It works like uuid.uuid4 except that tries to use os.urandom() if possible and it XORs the output with the tokens uniquely associated with this machine. """ rand_longs = (random.getrandbits(64), random.getrandbits(64)) if HAVE_URANDOM: urand_longs = _struct_2_long_long.unpack(fast_urandom16()) byte_s = _struct_2_long_long.pack(rand_longs[0] ^ urand_longs[0] ^ ctokens[0], rand_longs[1] ^ urand_longs[1] ^ ctokens[1]) else: byte_s = _struct_2_long_long.pack(rand_longs[0] ^ ctokens[0], rand_longs[1] ^ ctokens[1]) return str(uuid.UUID(bytes=byte_s, version=4))
def main(): random.seed() # ???? print(random.getrandbits(3)) print(random.randint(200, 800)) print(2, 400, 2) # ???? seq = range(1, 10) print(random.choice(seq)) print(random.sample(seq, 4)) a = list(seq) random.shuffle(a) print(a) # ?? print(random.random()) # [0.0, 1.0)??????? print(random.uniform(2.1, 4.99)) # ?????????? pass
def __init__(self, length=0, defined=False, value=None): """ Creates a bit string of a certain length, using a certain underlying implementation. """ self.length = length self.fitness = 0 self.isCacheValid = False self.defined = bitarray(length) self.data = bitarray(length) if defined: self.defined = None else: self.defined.setall(False) if value is not None: self.data = [bool(X) for X in value] for i in range(1, length+1): if bool(random.getrandbits(1)): self.flip(i)
def test_wrong_method(self): test_host = 'frob.nitz' test_path = '/fnord' ws = WSConnection(SERVER) nonce = bytes(random.getrandbits(8) for x in range(0, 16)) nonce = base64.b64encode(nonce) request = b'POST ' + test_path.encode('ascii') + b' HTTP/1.1\r\n' request += b'Host: ' + test_host.encode('ascii') + b'\r\n' request += b'Connection: Upgrade\r\n' request += b'Upgrade: WebSocket\r\n' request += b'Sec-WebSocket-Version: 13\r\n' request += b'Sec-WebSocket-Key: ' + nonce + b'\r\n' request += b'\r\n' ws.receive_bytes(request) event = next(ws.events()) assert isinstance(event, ConnectionFailed)
def test_bad_connection(self): test_host = 'frob.nitz' test_path = '/fnord' ws = WSConnection(SERVER) nonce = bytes(random.getrandbits(8) for x in range(0, 16)) nonce = base64.b64encode(nonce) request = b'GET ' + test_path.encode('ascii') + b' HTTP/1.1\r\n' request += b'Host: ' + test_host.encode('ascii') + b'\r\n' request += b'Connection: Zoinks\r\n' request += b'Upgrade: WebSocket\r\n' request += b'Sec-WebSocket-Version: 13\r\n' request += b'Sec-WebSocket-Key: ' + nonce + b'\r\n' request += b'\r\n' ws.receive_bytes(request) event = next(ws.events()) assert isinstance(event, ConnectionFailed)
def test_bad_upgrade(self): test_host = 'frob.nitz' test_path = '/fnord' ws = WSConnection(SERVER) nonce = bytes(random.getrandbits(8) for x in range(0, 16)) nonce = base64.b64encode(nonce) request = b'GET ' + test_path.encode('ascii') + b' HTTP/1.1\r\n' request += b'Host: ' + test_host.encode('ascii') + b'\r\n' request += b'Connection: Upgrade\r\n' request += b'Upgrade: WebPocket\r\n' request += b'Sec-WebSocket-Version: 13\r\n' request += b'Sec-WebSocket-Key: ' + nonce + b'\r\n' request += b'\r\n' ws.receive_bytes(request) event = next(ws.events()) assert isinstance(event, ConnectionFailed)
def test_missing_version(self): test_host = 'frob.nitz' test_path = '/fnord' ws = WSConnection(SERVER) nonce = bytes(random.getrandbits(8) for x in range(0, 16)) nonce = base64.b64encode(nonce) request = b'GET ' + test_path.encode('ascii') + b' HTTP/1.1\r\n' request += b'Host: ' + test_host.encode('ascii') + b'\r\n' request += b'Connection: Upgrade\r\n' request += b'Upgrade: WebSocket\r\n' request += b'Sec-WebSocket-Key: ' + nonce + b'\r\n' request += b'\r\n' ws.receive_bytes(request) event = next(ws.events()) assert isinstance(event, ConnectionFailed)
def test_accept_wrong_subprotocol(self): test_host = 'frob.nitz' test_path = '/fnord' ws = WSConnection(SERVER) nonce = bytes(random.getrandbits(8) for x in range(0, 16)) nonce = base64.b64encode(nonce) request = b'GET ' + test_path.encode('ascii') + b' HTTP/1.1\r\n' request += b'Host: ' + test_host.encode('ascii') + b'\r\n' request += b'Connection: Upgrade\r\n' request += b'Upgrade: WebSocket\r\n' request += b'Sec-WebSocket-Version: 13\r\n' request += b'Sec-WebSocket-Key: ' + nonce + b'\r\n' request += b'Sec-WebSocket-Protocol: one, two\r\n' request += b'\r\n' ws.receive_bytes(request) event = next(ws.events()) assert isinstance(event, ConnectionRequested) assert event.proposed_subprotocols == ['one', 'two'] with pytest.raises(ValueError): ws.accept(event, 'three')
def generate( cls, user_id=lambda: random.getrandbits(16), contents=lambda: random_alphanumeric_string(length=8192), expiry_time=lambda: int(time.time() + random.getrandbits(16)), title=lambda: random_alphanumeric_string(), language=lambda: random.choice(['python', 'css', 'javascript', 'text']), password=lambda: random_alphanumeric_string(), is_api_post=lambda: False, ): return database.paste.create_new_paste( contents=cls.random_or_specified_value(contents), user_id=cls.random_or_specified_value(user_id), expiry_time=cls.random_or_specified_value(expiry_time), title=cls.random_or_specified_value(title), language=cls.random_or_specified_value(language), password=cls.random_or_specified_value(password), is_api_post=cls.random_or_specified_value(is_api_post), )
def generate( cls, paste_id=lambda: random.getrandbits(16), file_name=lambda: random_alphanumeric_string(), file_size=lambda: random.getrandbits(16), mime_type=lambda: 'image/png', file_data=lambda: random_alphanumeric_string(8192) ): with mock.patch.object(database.attachment, '_store_attachment_file'): return database.attachment.create_new_attachment( paste_id=cls.random_or_specified_value(paste_id), file_name=cls.random_or_specified_value(file_name), file_size=cls.random_or_specified_value(file_size), mime_type=cls.random_or_specified_value(mime_type), file_data=cls.random_or_specified_value(file_data), )
def random_marker(): """ A random marker used to identify a pytest run. Some tests will spawn a private chain, the private chain will be one or more ethereum nodes on a new subprocesss. These nodes may fail to start on concurrent test runs, mostly because of port number conflicts, but even though the test fails to start its private chain it may run interacting with the geth process from a different test run! This leads to unreasonable test errors. This fixture creates a random marker used to distinguish pytest runs and avoid test failures. Note this could fail for other reasons and fail to detect unwanted interations if the user sets the PYTHONHASHSEED to the same value. """ random_hex = hex(random.getrandbits(100)) # strip the leading 0x and trailing L return random_hex[2:-1]
def circlepoint(c,r): """Generate a point [x,y] lying on a circle by the circle equation. Args: c (list): The position of circle centre. r (float): The radius of the circle. Returns: A point lies on a circle. The point position is random. """ x = random.uniform(-r,r) # Randomly find the x position. negative = bool(random.getrandbits(1)) # Randomly set whether the point is on the positive y side or negative side. y = math.sqrt(r**2-x**2) # The equation of the circle. if negative: y = -y return [x+c[0],y+c[1]]
def encode_params(self, base_url, method, params): params = params.copy() if self.token: params['oauth_token'] = self.token params['oauth_consumer_key'] = self.consumer_key params['oauth_signature_method'] = 'HMAC-SHA1' params['oauth_version'] = '1.0' params['oauth_timestamp'] = str(int(time())) params['oauth_nonce'] = str(getrandbits(64)) enc_params = urlencode_noplus(sorted(params.items())) key = self.consumer_secret + "&" + urllib_parse.quote(self.token_secret, safe='~') message = '&'.join( urllib_parse.quote(i, safe='~') for i in [method.upper(), base_url, enc_params]) signature = (base64.b64encode(hmac.new( key.encode('ascii'), message.encode('ascii'), hashlib.sha1) .digest())) return enc_params + "&" + "oauth_signature=" + urllib_parse.quote(signature, safe='~')
def gouv_api_csv(csv): output_csv = '' for line in csv.splitlines()[1:]: output_csv += line # Always fills the data of the result used for tests. if bool(random.getrandbits(1)) or "-21.9419851,64.14602" in line: #"64,1460200,-21,9419851" in line: output_csv += ',' + ','.join([ str(fake.latitude()), # "result_latitude" str(fake.longitude()), # "result_longitude" fake.address().replace('\n', ' ').replace(',', ' '), # "result_label" str(random.randint(0, 300)), # "result_distance" "housenumber", # "result_type" "44109_XXXX_984eec", # "result_id" fake.building_number(), # "result_housenumber" fake.street_name(), # "result_name", the name of the street. '', # "result_street", empty in most case. fake.postcode(), # "result_postcode" fake.city(), # "result_city" '"' + fake.postcode()[:2] + ' ' + fake.department()[1] + ', ' + fake.region() + '"', # "result_context" fake.postcode()[:2] + str(random.randint(100, 300)), # "result_citycode" ]) else: output_csv += ',,,,,,,,,,,,,' output_csv += '\n' return output_csv
def result_properties(): properties = {} properties["access"] = "yes" if bool(random.getrandbits(1)): properties["name"] = fake.company() if bool(random.getrandbits(1)): properties["phone"] = fake.phone_number() if bool(random.getrandbits(1)): properties["fee"] = "yes" elif bool(random.getrandbits(1)): properties["fee"] = "no" if bool(random.getrandbits(1)): properties["opening_hours"] = "Mo-Su 09:00-19:00" elif bool(random.getrandbits(1)): properties["opening_hours"] = "Mo-Th 12:30-16:30" return properties
def web2py_uuid(ctokens=UNPACKED_CTOKENS): """ This function follows from the following discussion: http://groups.google.com/group/web2py-developers/browse_thread/thread/7fd5789a7da3f09 It works like uuid.uuid4 except that tries to use os.urandom() if possible and it XORs the output with the tokens uniquely associated with this machine. """ rand_longs = (random.getrandbits(64), random.getrandbits(64)) if HAVE_URANDOM: urand_longs = struct.unpack('=QQ', fast_urandom16()) byte_s = struct.pack('=QQ', rand_longs[0] ^ urand_longs[0] ^ ctokens[0], rand_longs[1] ^ urand_longs[1] ^ ctokens[1]) else: byte_s = struct.pack('=QQ', rand_longs[0] ^ ctokens[0], rand_longs[1] ^ ctokens[1]) return str(uuid.UUID(bytes=byte_s, version=4))
def test_random_read_write(self): """Test random read/write""" q = Queue(self.path) n = 0 for i in range(1000): if random.random() < 0.5: if n > 0: q.get_nowait() q.task_done() n -= 1 else: with self.assertRaises(Empty): q.get_nowait() else: q.put('var%d' % random.getrandbits(16)) n += 1
def run(self, q_job): """Run circuits in q_job""" # Generating a string id for the job job_id = str(uuid.uuid4()) result_list = [] qobj = q_job.qobj self._sim = Simulator(gate_fusion=True) if 'seed' in qobj['config']: self._seed = qobj['config']['seed'] self._sim._simulator = CppSim(self._seed) else: self._seed = random.getrandbits(32) self._shots = qobj['config']['shots'] for circuit in qobj['circuits']: result_list.append(self.run_circuit(circuit)) return Result({'job_id': job_id, 'result': result_list, 'status': 'COMPLETED'}, qobj)
def test_append_rules(self, asa): rules = [] for i in range(1, 351): protocol = choice(["tcp", "udp"]) if bool(getrandbits(1)): src = IPAddress(randint(0, 4294967295)) else: src = IPNetwork(f"{IPAddress(randint(0, 4294967295))}/{randint(0, 31)}").cidr if bool(getrandbits(1)): dst = IPAddress(randint(0, 4294967295)) else: dst = IPNetwork(f"{IPAddress(randint(0, 4294967295))}/{randint(0, 31)}").cidr dst_port = randint(1, 65535) src_comp = choice([comp for comp in ServiceComparator]) dst_comp = choice([comp for comp in ServiceComparator]) rule = RuleTCPUDP(protocol=protocol, src=src, dst=dst, src_port=i, dst_port=dst_port, src_comp=src_comp, dst_comp=dst_comp) rules.append(rule) asa.acl.append_rules(settings.test_acl, rules)
def run(self): if not self.coordinator.enabled: return log.debug("Coordinator thread for %s starting: %s" % (self.coordinator.filesystem, threading.current_thread())) while not self._stopping.is_set(): # Keep agents busy self.generate_actions_for_agents() # Stack up waiting actions if bool(random.getrandbits(1)): self.unassigned_requests.put(self.get_random_action()) self.cull_completed_requests() # TODO: Very infrequently, randomly cancel an agent action self._stopping.wait(HSM_COORDINATOR_LOOP_INTERVAL)
def image_read(self, imname): image = misc.imread(imname, mode='RGB').astype(np.float) r,c,ch = image.shape if r < 299 or c < 299: # TODO: check too small images # print "##too small!!" image = misc.imresize(image, (299, 299, 3)) elif r > 299 or c > 299: image = image[(r-299)/2 : (r-299)/2 + 299, (c-299)/2 : (c-299)/2 + 299, :] # print r, c, image.shape assert image.shape == (299, 299, 3) image = (image / 255.0) * 2.0 - 1.0 if self.random_noise: add_noise = bool(random.getrandbits(1)) if add_noise: eps = random.choice([4.0, 8.0, 12.0, 16.0]) / 255.0 * 2.0 noise_image = image + eps * np.random.choice([-1, 1], (299,299,3)) image = np.clip(noise_image, -1.0, 1.0) return image
def AdminLogin(req,onlyhead): global currentcookie passwd = req.query.get('passwd',[''])[0] salt = Config.conf['AdministratorPassword'][:2] passwd = crypt.crypt(passwd,salt) if passwd != Config.conf['AdministratorPassword']: return Delegate('/errors/wrongpasswd.html',req,onlyhead) random.seed(os.urandom(200)); currentcookie = str(random.getrandbits(200)) handler = EH_Generic_class() handler.iamadmin = 1 (header,content) = Site['/adminmenu.html'].getresult(req,onlyhead,handler) header['Set-Cookie'] = 'OKUSON='+currentcookie+ \ ';Path=/;Max-Age=3600;Version=1' # Max-Age is one hour #header['Location'] = '/adminmenu.html' # Taken out to please opera, which does not get the cookie for the # login with this header. Max. return (header,content)
def uniform_random(lower_bound, upper_bound): """ Question 5.10: Generates uniform random number within lower and upper bounds, inclusive """ num_outcomes = upper_bound - lower_bound + 1 result = None while True: result = 0 i = 0 while 1 << i < num_outcomes: result = (result << 1) | getrandbits(1) i += 1 if result < num_outcomes: break return result + lower_bound
def generateChallengeResponseV2(password, user, server_challenge, server_info, domain = '', client_challenge = None): client_timestamp = '\0' * 8 if not client_challenge: client_challenge = '' for i in range(0, 8): client_challenge += chr(random.getrandbits(8)) assert len(client_challenge) == 8 d = MD4() d.update(password.encode('UTF-16LE')) ntlm_hash = d.digest() # The NT password hash response_key = hmac.new(ntlm_hash, (user.upper() + domain).encode('UTF-16LE')).digest() # The NTLMv2 password hash. In [MS-NLMP], this is the result of NTOWFv2 and LMOWFv2 functions temp = client_timestamp + client_challenge + domain.encode('UTF-16LE') + server_info nt_challenge_response = hmac.new(response_key, server_challenge + temp).digest() lm_challenge_response = hmac.new(response_key, server_challenge + client_challenge).digest() + client_challenge session_key = hmac.new(response_key, nt_challenge_response).digest() return nt_challenge_response, lm_challenge_response, session_key ################ # NTLMv1 Methods ################
def __init__(self, url, mode = "r"): import random try: import xbmc except: xbmc = None self.url = url self.remote, self.share, self.path = path = connect(url) self.mode = mode self.binary = False self.canread = False self.canwrite = False self.closed = True self.size = 0 self.pos = 0 if xbmc: self.tmp_path = os.path.join(xbmc.translatePath("special://temp/"), "%08x" % (random.getrandbits(32))) else: self.tmp_path = os.path.join(os.getenv("TEMP") or os.getenv("TMP") or os.getenv("TMPDIR"), "%08x" % (random.getrandbits(32))) self.tmp_file = None self.__get_mode__()
def dialog_select(self, heading, list): ID = "%032x" %(random.getrandbits(128)) response = '<?xml version="1.0" encoding="UTF-8" ?>\n' response +='<rss version="2.0" xmlns:dc="http://purl.org/dc/elements/1.1/">\n' response +='<channel>\n' response +='<link>/rss</link>\n' response +='<title>'+heading+'</title>\n' for option in list: response += '<item>\n' response += '<title>'+option +'</title>\n' response += '<image>%s</image>\n' response += '<link>http://'+self.controller.host+'/data/'+threading.current_thread().name +'/'+ID+'/'+str(list.index(option))+'</link>\n' response += '</item>\n\n' response += '</channel>\n' response += '</rss>\n' self.controller.send_data(response) self.handler.server.shutdown_request(self.handler.request) while not self.controller.get_data(ID): continue return int(self.controller.get_data(ID))
def test_bool_1_chan_1_samp(self, x_series_device, seed): # Reset the pseudorandom number generator with seed. random.seed(seed) do_line = random.choice(x_series_device.do_lines).name with nidaqmx.Task() as task: task.do_channels.add_do_chan( do_line, line_grouping=LineGrouping.CHAN_PER_LINE) # Generate random values to test. values_to_test = [bool(random.getrandbits(1)) for _ in range(10)] values_read = [] for value_to_test in values_to_test: task.write(value_to_test) time.sleep(0.001) values_read.append(task.read()) assert values_read == values_to_test # Verify setting number_of_samples_per_channel (even to 1) # returns a list. value_read = task.read(number_of_samples_per_channel=1) assert isinstance(value_read, list) assert len(value_read) == 1
def test_bool_n_chan_1_samp(self, x_series_device, seed): # Reset the pseudorandom number generator with seed. random.seed(seed) number_of_channels = random.randint(2, len(x_series_device.do_lines)) do_lines = random.sample(x_series_device.do_lines, number_of_channels) with nidaqmx.Task() as task: task.do_channels.add_do_chan( flatten_channel_string([d.name for d in do_lines]), line_grouping=LineGrouping.CHAN_PER_LINE) # Generate random values to test. values_to_test = [bool(random.getrandbits(1)) for _ in range(number_of_channels)] task.write(values_to_test) time.sleep(0.001) values_read = task.read() assert values_read == values_to_test # Verify setting number_of_samples_per_channel (even to 1) # returns a list of lists. value_read = task.read(number_of_samples_per_channel=1) assert isinstance(value_read, list) assert isinstance(value_read[0], list)
def test_uint_1_chan_1_samp(self, x_series_device, seed): # Reset the pseudorandom number generator with seed. random.seed(seed) do_port = random.choice(x_series_device.do_ports) with nidaqmx.Task() as task: task.do_channels.add_do_chan( do_port.name, line_grouping=LineGrouping.CHAN_FOR_ALL_LINES) # Generate random values to test. values_to_test = [int(random.getrandbits(do_port.do_port_width)) for _ in range(10)] values_read = [] for value_to_test in values_to_test: task.write(value_to_test) time.sleep(0.001) values_read.append(task.read()) assert values_read == values_to_test # Verify setting number_of_samples_per_channel (even to 1) # returns a list. value_read = task.read(number_of_samples_per_channel=1) assert isinstance(value_read, list) assert len(value_read) == 1
def test_many_sample_port_byte(self, x_series_device, seed): # Reset the pseudorandom number generator with seed. random.seed(seed) number_of_samples = random.randint(2, 20) do_port = random.choice( [d for d in x_series_device.do_ports if d.do_port_width <= 8]) with nidaqmx.Task() as task: task.do_channels.add_do_chan( do_port.name, line_grouping=LineGrouping.CHAN_FOR_ALL_LINES) # Generate random values to test. values_to_test = numpy.array( [int(random.getrandbits(do_port.do_port_width)) for _ in range(number_of_samples)], dtype=numpy.uint8) writer = DigitalSingleChannelWriter(task.out_stream) reader = DigitalSingleChannelReader(task.in_stream) task.start() writer.write_many_sample_port_byte(values_to_test) time.sleep(0.001) # Since we're writing to and reading from ONLY the digital # output lines, we can't use sample clocks to correlate the # read and write sampling times. Thus, we essentially read # the last value written multiple times. values_read = numpy.zeros(number_of_samples, dtype=numpy.uint8) reader.read_many_sample_port_byte( values_read, number_of_samples_per_channel=number_of_samples) expected_values = [ values_to_test[-1] for _ in range(number_of_samples)] numpy.testing.assert_array_equal(values_read, expected_values)