我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.urandom()。
def new_session(account): if account.get('session',None) is None: session = requests.session() session.verify = True session.headers.update({'User-Agent': 'Niantic App'}) # session.headers.update({'User-Agent': 'niantic'}) if not account['proxy'] is None: session.proxies.update(account['proxy']) account['session'] = session else: account['session'].close() account['session'].cookies.clear() account['session_time'] = get_time() account['session_hash'] = os.urandom(32) account['api_url'] = API_URL account['auth_ticket'] = None
def test_copy(tmpdir, storages): f = tmpdir.join('alpha') f.write_binary(os.urandom(1024 * 40)) f_http = Resource(storages['http']('index.html')) f_file = Resource(storages['file'](f.strpath)) f_sftp = Resource(storages['sftp']('/gamma')) f_ftp = Resource(storages['ftp']('/beta')) assert f_http.exists() delete_files(f_file, f_sftp, f_ftp) assert not f_file.exists()\ and not f_sftp.exists()\ and not f_ftp.exists() transfert.actions.copy(f_http, f_ftp, size=40960) assert f_ftp.exists() and f_http.exists() transfert.actions.copy(f_ftp, f_sftp, size=40960) assert f_ftp.exists() and f_sftp.exists() transfert.actions.copy(f_sftp, f_file, size=40960) assert f_sftp.exists() and f_file.exists()
def encrypt(self, data, offset=None, length=None): """Encrypts the given data with the current key""" if offset is None: offset = 0 if length is None: length = len(data) with BinaryWriter() as writer: # Write SHA writer.write(sha1(data[offset:offset + length]).digest()) # Write data writer.write(data[offset:offset + length]) # Add padding if required if length < 235: writer.write(os.urandom(235 - length)) result = int.from_bytes(writer.get_bytes(), byteorder='big') result = pow(result, self.e, self.m) # If the result byte count is less than 256, since the byte order is big, # the non-used bytes on the left will be 0 and act as padding, # without need of any additional checks return int.to_bytes( result, length=256, byteorder='big', signed=False)
def seed(self, a=None): """Initialize internal state from hashable object. None or no argument seeds from current time or from an operating system specific randomness source if available. If a is not None or an int or long, hash(a) is used instead. """ if a is None: try: a = long(_hexlify(_urandom(16)), 16) except NotImplementedError: import time a = long(time.time() * 256) # use fractional seconds super(Random, self).seed(a) self.gauss_next = None
def uuid4(): """Generate a random UUID.""" # When the system provides a version-4 UUID generator, use it. if _uuid_generate_random: _buffer = ctypes.create_string_buffer(16) _uuid_generate_random(_buffer) return UUID(bytes=_buffer.raw) # Otherwise, get randomness from urandom or the 'random' module. try: import os return UUID(bytes=os.urandom(16), version=4) except: import random bytes = [chr(random.randrange(256)) for i in range(16)] return UUID(bytes=bytes, version=4)
def _key(): global __key if __key: return __key data_dir = _key_dir() key_file = os.path.join(data_dir, 'key') if os.path.isfile(key_file): with open(key_file, 'rb') as f: __key = base64.b64decode(f.read()) return __key __key = base64.b64encode(os.urandom(16)) try: os.makedirs(data_dir) except OSError as e: # errno17 == dir exists if e.errno != 17: raise with open(key_file, 'wb') as f: f.write(__key) return __key
def yandex(url): try: cookie = client.request(url, output='cookie') r = client.request(url, cookie=cookie) r = re.sub(r'[^\x00-\x7F]+', ' ', r) sk = re.findall('"sk"\s*:\s*"([^"]+)', r)[0] idstring = re.findall('"id"\s*:\s*"([^"]+)', r)[0] idclient = binascii.b2a_hex(os.urandom(16)) post = {'idClient': idclient, 'version': '3.9.2', 'sk': sk, '_model.0': 'do-get-resource-url', 'id.0': idstring} post = urllib.urlencode(post) r = client.request('https://yadi.sk/models/?_m=do-get-resource-url', post=post, cookie=cookie) r = json.loads(r) url = r['models'][0]['data']['file'] return url except: return
def create_file(path, size): """Create a file with random contents and a given size""" print('Creating file: {} having a size of {} Bytes'.format(path, size)) t_start = time.time() with open(path, mode='wb') as fp: # Write integer number of blocks data_left = size for _ in range(size // DATA_BLOCK): fp.write(os.urandom(DATA_BLOCK)) data_left -= DATA_BLOCK # Write the remainder (if any) if data_left > 0: fp.write(os.urandom(data_left)) print('File created in {} sec.'.format(time.time()-t_start))
def check_for_wildcards(args, server, name, rectype, tries=4): """ Verify that the DNS server doesn't return wildcard results for domains which don't exist, it should correctly return NXDOMAIN. """ resolver = Resolver() resolver.timeout = args.timeout resolver.lifetime = args.timeout resolver.nameservers = [server] nx_names = [base64.b32encode( os.urandom( random.randint(8, 10)) ).strip('=').lower() + name for _ in range(0, tries)] correct_result_count = 0 for check_nx_name in nx_names: try: result = resolver.query(check_nx_name, rectype) return False # Any valid response = immediate fail! except (NXDOMAIN, NoNameservers): correct_result_count += 1 except DNSException: continue return correct_result_count > (tries / 2.0)
def __init__(self, auth_provider, device_info=None): self.log = logging.getLogger(__name__) self._auth_provider = auth_provider # mystical unknown6 - resolved by PokemonGoDev self._signal_agglom_gen = False self._signature_lib = None if RpcApi.START_TIME == 0: RpcApi.START_TIME = get_time(ms=True) if RpcApi.RPC_ID == 0: RpcApi.RPC_ID = int(random.random() * 10 ** 18) self.log.debug('Generated new random RPC Request id: %s', RpcApi.RPC_ID) # data fields for unknown6 self.session_hash = os.urandom(32) self.token2 = random.randint(1,59) self.course = random.uniform(0, 360) self.device_info = device_info
def read_random_bits(nbits): '''Reads 'nbits' random bits. If nbits isn't a whole number of bytes, an extra byte will be appended with only the lower bits set. ''' nbytes, rbits = divmod(nbits, 8) # Get the random bytes randomdata = os.urandom(nbytes) # Add the remaining random bits if rbits > 0: randomvalue = ord(os.urandom(1)) randomvalue >>= (8 - rbits) randomdata = byte(randomvalue) + randomdata return randomdata
def _make_flow(request, scopes, return_url=None): """Creates a Web Server Flow""" # Generate a CSRF token to prevent malicious requests. csrf_token = hashlib.sha256(os.urandom(1024)).hexdigest() request.session[_CSRF_KEY] = csrf_token state = json.dumps({ 'csrf_token': csrf_token, 'return_url': return_url, }) flow = client.OAuth2WebServerFlow( client_id=django_util.oauth2_settings.client_id, client_secret=django_util.oauth2_settings.client_secret, scope=scopes, state=state, redirect_uri=request.build_absolute_uri( urlresolvers.reverse("google_oauth:callback"))) flow_key = _FLOW_KEY.format(csrf_token) request.session[flow_key] = pickle.dumps(flow) return flow
def init_field(self, startpoint): '''Place the mines and reveal the starting point. Internal details: It will wrap in `init_field2` in guessless mode. It will place the mines by itself when not in guessless mode. ''' if self.guessless: # Wrap in the best version. self.init_field2(startpoint) else: safe = self.field.get_neighbours(startpoint) + [startpoint] cells = list(filter( lambda x: x not in safe, self.field.all_cells() )) # Choose self.n_mines randomly selected mines. cells.sort(key=lambda x: os.urandom(1)) mines = cells[:self.n_mines] self.field.clear() self.field.fill(mines) self.field.reveal(startpoint)
def newcursor(self, dictcursor=False): ''' This creates a DB cursor for the current DB connection using a randomly generated handle. Returns a tuple with cursor and handle. dictcursor = True -> use a cursor where each returned row can be addressed as a dictionary by column name ''' handle = hashlib.sha256(os.urandom(12)).hexdigest() if dictcursor: self.cursors[handle] = self.connection.cursor( cursor_factory=psycopg2.extras.DictCursor ) else: self.cursors[handle] = self.connection.cursor() return (self.cursors[handle], handle)
def cluster_config(self): """ Provide the default configuration for a cluster """ if self.cluster: cluster_dir = "{}/config/stack/default/{}".format(self.root_dir, self.cluster) if not os.path.isdir(cluster_dir): _create_dirs(cluster_dir, self.root_dir) filename = "{}/cluster.yml".format(cluster_dir) contents = {} contents['fsid'] = str(uuid.uuid3(uuid.NAMESPACE_DNS, os.urandom(32))) public_networks_str = ", ".join([str(n) for n in self.public_networks]) cluster_networks_str = ", ".join([str(n) for n in self.cluster_networks]) contents['public_network'] = public_networks_str contents['cluster_network'] = cluster_networks_str contents['available_roles'] = self.available_roles self.writer.write(filename, contents)
def test_send_receive(self): random.shuffle(self.swarm) senders = self.swarm[:len(self.swarm)/2] receivers = self.swarm[len(self.swarm)/2:] for sender, receiver in zip(senders, receivers): message = binascii.hexlify(os.urandom(64)) # check queue previously empty self.assertFalse(bool(receiver.message_list())) # send message self.assertTrue(sender.message_send(receiver.dht_id(), message)) # check received received = receiver.message_list() self.assertTrue(sender.dht_id() in received) messages = received[sender.dht_id()] self.assertTrue(len(messages) == 1) self.assertEqual(messages[0], message) # check queue empty after call to message_list self.assertFalse(bool(receiver.message_list()))
def test_ordering(self): random.shuffle(self.swarm) sender = self.swarm[0] receiver = self.swarm[-1] # send messages message_alpha = binascii.hexlify(os.urandom(64)) message_beta = binascii.hexlify(os.urandom(64)) message_gamma = binascii.hexlify(os.urandom(64)) self.assertTrue(sender.message_send(receiver.dht_id(), message_alpha)) self.assertTrue(sender.message_send(receiver.dht_id(), message_beta)) self.assertTrue(sender.message_send(receiver.dht_id(), message_gamma)) # check received in order received = receiver.message_list() self.assertTrue(sender.dht_id() in received) messages = received[sender.dht_id()] self.assertEqual(messages[0], message_alpha) self.assertEqual(messages[1], message_beta) self.assertEqual(messages[2], message_gamma)
def test_flood(self): # every node subscribes and should receive the event topic = "test_flood_{0}".format(binascii.hexlify(os.urandom(32))) for peer in self.swarm: peer.pubsub_subscribe(topic) # wait until subscriptions propagate time.sleep(SLEEP_TIME) # send event peer = random.choice(self.swarm) event = binascii.hexlify(os.urandom(32)) peer.pubsub_publish(topic, event) # wait until event propagates time.sleep(SLEEP_TIME) # check all peers received the event for peer in self.swarm: events = peer.pubsub_events(topic) self.assertEqual(events, [event])
def test_multihop(self): random.shuffle(self.swarm) senders = self.swarm[:len(self.swarm) / 2] receivers = self.swarm[len(self.swarm) / 2:] for sender, receiver in zip(senders, receivers): # receiver subscribes to topic topic = "test_miltihop_{0}".format(binascii.hexlify(os.urandom(32))) receiver.pubsub_subscribe(topic) # wait until subscriptions propagate time.sleep(SLEEP_TIME) # send event event = binascii.hexlify(os.urandom(32)) sender.pubsub_publish(topic, event) # wait until event propagates time.sleep(SLEEP_TIME) # check all peers received the event events = receiver.pubsub_events(topic) self.assertEqual(events, [event])
def server_udp_pre_encrypt(self, buf, uid): if uid in self.server_info.users: user_key = self.server_info.users[uid] else: uid = None if not self.server_info.users: user_key = self.server_info.key else: user_key = self.server_info.recv_iv authdata = os.urandom(7) mac_key = self.server_info.key md5data = hmac.new(mac_key, authdata, self.hashfunc).digest() rand_len = self.udp_rnd_data_len(md5data, self.random_server) encryptor = encrypt.Encryptor(to_bytes(base64.b64encode(user_key)) + to_bytes(base64.b64encode(md5data)), 'rc4') out_buf = encryptor.encrypt(buf) buf = out_buf + os.urandom(rand_len) + authdata return buf + hmac.new(user_key, buf, self.hashfunc).digest()[:1]
def __init__(self, method, hashfunc): super(auth_aes128_sha1, self).__init__(method) self.hashfunc = hashfunc self.recv_buf = b'' self.unit_len = 8100 self.raw_trans = False self.has_sent_header = False self.has_recv_header = False self.client_id = 0 self.connection_id = 0 self.max_time_dif = 60 * 60 * 24 # time dif (second) setting self.salt = hashfunc == hashlib.md5 and b"auth_aes128_md5" or b"auth_aes128_sha1" self.no_compatible_method = hashfunc == hashlib.md5 and "auth_aes128_md5" or 'auth_aes128_sha1' self.extra_wait_size = struct.unpack('>H', os.urandom(2))[0] % 1024 self.pack_id = 1 self.recv_id = 1 self.user_id = None self.user_key = None self.last_rnd_len = 0 self.overhead = 9
def writeConfiguredCookieFile(self, cookie_string = None): ''' Write a random 32-byte value to the configured cookie file. If cookie_string is not None, use that value. Return the value written to the file, or None if there is no cookie file, or if writing the file fails. ''' cookie_file = self.getConfiguredCookieFile() if cookie_file is not None: if cookie_string is None: cookie_string = urandom(TorControlProtocol.SAFECOOKIE_LENGTH) try: with open(cookie_file, 'w') as f: f.write(cookie_string) except IOError as e: logging.warning("Disabling SAFECOOKIE authentication, writing cookie file '{}' failed with error: {}" .format(cookie_file, e)) return None # sanity check: this will fail in write-only environments assert cookie_string == TorControlProtocol.readCookieFile( cookie_file) return cookie_string else: return None
def test_io_and_many_files(self): import os from time import time start = time() for i in range(10000): # ????????????? obj = os.urandom(16 * 1024) self.cache.put_obj(i, obj, info_dict={"bin": obj}) for i in range(10000): info = self.cache.get_info(i) obj = self.cache.get_obj(i) self.assertEqual(info['bin'], obj) print("test_io_and_many_files IO total time:", time() - start) # test clean delete all_cache_file_path = [v[0] for v in self.cache.items_dict.values()] start = time() del self.cache print("test_io_and_many_files DELETE ALL total time:", time() - start) for path in all_cache_file_path: self.assertFalse(os.path.exists(path), msg=path)
def test_base64io_encode_file(tmpdir): source_plaintext = os.urandom(1024 * 1024) plaintext_b64 = base64.b64encode(source_plaintext) plaintext = tmpdir.join('plaintext') b64_plaintext = tmpdir.join('base64_plaintext') with open(str(plaintext), 'wb') as file: file.write(source_plaintext) with open(str(plaintext), 'rb') as source, open(str(b64_plaintext), 'wb') as target: with Base64IO(target) as encoder: for chunk in source: encoder.write(chunk) with open(str(b64_plaintext), 'rb') as file2: encoded = file2.read() assert encoded == plaintext_b64
def test_encrypt_with_metadata_output_write_to_file(tmpdir): plaintext = tmpdir.join('source_plaintext') plaintext.write_binary(os.urandom(1024)) ciphertext = tmpdir.join('ciphertext') metadata = tmpdir.join('metadata') encrypt_args = encrypt_args_template(metadata=True).format( source=str(plaintext), target=str(ciphertext), metadata='--metadata-output ' + str(metadata) ) aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows())) raw_metadata = metadata.read() output_metadata = json.loads(raw_metadata) for key, value in (('a', 'b'), ('c', 'd')): assert output_metadata['header']['encryption_context'][key] == value assert output_metadata['mode'] == 'encrypt' assert output_metadata['input'] == str(plaintext) assert output_metadata['output'] == str(ciphertext)
def test_encrypt_with_metadata_full_file_path(tmpdir): plaintext_filename = 'source_plaintext' plaintext_file = tmpdir.join(plaintext_filename) plaintext_file.write_binary(os.urandom(1024)) plaintext_file_full_path = str(plaintext_file) ciphertext_filename = 'ciphertext' ciphertext_file = tmpdir.join(ciphertext_filename) ciphertext_file_full_path = str(ciphertext_file) metadata = tmpdir.join('metadata') encrypt_args = encrypt_args_template(metadata=True).format( source=plaintext_filename, target=ciphertext_filename, metadata='--metadata-output ' + str(metadata) ) with tmpdir.as_cwd(): aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows())) raw_metadata = metadata.read() output_metadata = json.loads(raw_metadata) assert output_metadata['input'] == plaintext_file_full_path assert output_metadata['output'] == ciphertext_file_full_path
def test_encrypt_with_metadata_output_write_to_stdout(tmpdir, capsys): plaintext = tmpdir.join('source_plaintext') plaintext.write_binary(os.urandom(1024)) ciphertext = tmpdir.join('ciphertext') encrypt_args = encrypt_args_template(metadata=True).format( source=str(plaintext), target=str(ciphertext), metadata='--metadata-output -' ) aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows())) out, _err = capsys.readouterr() output_metadata = json.loads(out) for key, value in (('a', 'b'), ('c', 'd')): assert output_metadata['header']['encryption_context'][key] == value assert output_metadata['mode'] == 'encrypt' assert output_metadata['input'] == str(plaintext) assert output_metadata['output'] == str(ciphertext)
def test_file_to_file_decrypt_required_encryption_context_success(tmpdir, required_encryption_context): plaintext = tmpdir.join('source_plaintext') ciphertext = tmpdir.join('ciphertext') decrypted = tmpdir.join('decrypted') with open(str(plaintext), 'wb') as f: f.write(os.urandom(1024)) encrypt_args = encrypt_args_template().format( source=str(plaintext), target=str(ciphertext) ) decrypt_args = decrypt_args_template().format( source=str(ciphertext), target=str(decrypted) ) + ' --encryption-context ' + required_encryption_context aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows())) aws_encryption_sdk_cli.cli(shlex.split(decrypt_args, posix=not is_windows())) assert filecmp.cmp(str(plaintext), str(decrypted))
def test_file_to_file_decrypt_required_encryption_context_fail(tmpdir, required_encryption_context): plaintext = tmpdir.join('source_plaintext') plaintext.write_binary(os.urandom(1024)) ciphertext = tmpdir.join('ciphertext') metadata_file = tmpdir.join('metadata') decrypted = tmpdir.join('decrypted') encrypt_args = encrypt_args_template().format( source=str(plaintext), target=str(ciphertext) ) decrypt_args = decrypt_args_template(metadata=True).format( source=str(ciphertext), target=str(decrypted), metadata=' --metadata-output ' + str(metadata_file) ) + ' --encryption-context ' + required_encryption_context aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows())) aws_encryption_sdk_cli.cli(shlex.split(decrypt_args, posix=not is_windows())) assert not decrypted.isfile() raw_metadata = metadata_file.read() parsed_metadata = json.loads(raw_metadata) assert parsed_metadata['skipped'] assert parsed_metadata['reason'] == 'Missing encryption context key or value'
def test_file_to_file_cycle(tmpdir): plaintext = tmpdir.join('source_plaintext') ciphertext = tmpdir.join('ciphertext') decrypted = tmpdir.join('decrypted') with open(str(plaintext), 'wb') as f: f.write(os.urandom(1024)) encrypt_args = encrypt_args_template().format( source=str(plaintext), target=str(ciphertext) ) decrypt_args = decrypt_args_template().format( source=str(ciphertext), target=str(decrypted) ) aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows())) aws_encryption_sdk_cli.cli(shlex.split(decrypt_args, posix=not is_windows())) assert filecmp.cmp(str(plaintext), str(decrypted))
def test_file_to_file_cycle_target_through_symlink(tmpdir): plaintext = tmpdir.join('source_plaintext') output_dir = tmpdir.mkdir('output') os.symlink(str(output_dir), str(tmpdir.join('output_link'))) ciphertext = tmpdir.join('output_link', 'ciphertext') decrypted = tmpdir.join('decrypted') with open(str(plaintext), 'wb') as f: f.write(os.urandom(1024)) encrypt_args = encrypt_args_template().format( source=str(plaintext), target=str(ciphertext) ) decrypt_args = decrypt_args_template().format( source=str(ciphertext), target=str(decrypted) ) aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows())) aws_encryption_sdk_cli.cli(shlex.split(decrypt_args, posix=not is_windows())) assert filecmp.cmp(str(plaintext), str(decrypted))
def test_file_overwrite_source_file_to_file_custom_empty_prefix(tmpdir): plaintext_source = os.urandom(2014) plaintext = tmpdir.join('source_plaintext') with open(str(plaintext), 'wb') as f: f.write(plaintext_source) encrypt_args = encrypt_args_template().format( source=str(plaintext), target=str(plaintext) ) + ' --suffix' test_result = aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows())) assert test_result == 'Destination and source cannot be the same' with open(str(plaintext), 'rb') as f: assert f.read() == plaintext_source
def test_file_overwrite_source_dir_to_dir_custom_empty_prefix(tmpdir): plaintext_source = os.urandom(2014) plaintext = tmpdir.join('source_plaintext') with open(str(plaintext), 'wb') as f: f.write(plaintext_source) encrypt_args = encrypt_args_template().format( source=str(tmpdir), target=str(tmpdir) ) + ' --suffix' test_result = aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows())) assert test_result == 'Destination and source cannot be the same' with open(str(plaintext), 'rb') as f: assert f.read() == plaintext_source
def test_file_overwrite_source_file_to_dir_custom_empty_prefix(tmpdir): plaintext_source = os.urandom(2014) plaintext = tmpdir.join('source_plaintext') with open(str(plaintext), 'wb') as f: f.write(plaintext_source) encrypt_args = encrypt_args_template().format( source=str(plaintext), target=str(tmpdir) ) + ' --suffix' test_result = aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows())) assert test_result is None with open(str(plaintext), 'rb') as f: assert f.read() == plaintext_source
def test_file_to_dir_cycle(tmpdir): inner_dir = tmpdir.mkdir('inner') plaintext = tmpdir.join('source_plaintext') ciphertext = inner_dir.join('source_plaintext.encrypted') decrypted = tmpdir.join('decrypted') with open(str(plaintext), 'wb') as f: f.write(os.urandom(1024)) encrypt_args = encrypt_args_template().format( source=str(plaintext), target=str(inner_dir) ) decrypt_args = decrypt_args_template().format( source=str(ciphertext), target=str(decrypted) ) aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows())) assert os.path.isfile(str(ciphertext)) aws_encryption_sdk_cli.cli(shlex.split(decrypt_args, posix=not is_windows())) assert filecmp.cmp(str(plaintext), str(decrypted))
def test_stdin_to_file_to_stdout_cycle(tmpdir): ciphertext_file = tmpdir.join('ciphertext') plaintext = os.urandom(1024) encrypt_args = 'aws-encryption-cli ' + encrypt_args_template(decode=True).format( source='-', target=str(ciphertext_file) ) decrypt_args = 'aws-encryption-cli ' + decrypt_args_template(encode=True).format( source=str(ciphertext_file), target='-' ) proc = Popen(shlex.split(encrypt_args, posix=not is_windows()), stdout=PIPE, stdin=PIPE, stderr=PIPE) _stdout, _stderr = proc.communicate(input=base64.b64encode(plaintext)) proc = Popen(shlex.split(decrypt_args, posix=not is_windows()), stdout=PIPE, stdin=PIPE, stderr=PIPE) decrypted_stdout, _stderr = proc.communicate() assert base64.b64decode(decrypted_stdout) == plaintext
def encryptPassword(plaintext): try: letters = 'qwertyuiopasdfghjklzxcvbnmQWERTYUIOPASDFGHJKLZXCVBNM1234567890????????????????????????????????????????????????????????????????' SALT_SIZE = 8 salt = urandom(SALT_SIZE) randomText = ''.join([random.choice(letters) for i in range(8)]) plaintext = "%3d" % len(plaintext) + plaintext + randomText plaintext = plaintext.encode('utf-8') arc4 = ARC4.new(salt) crypted = arc4.encrypt(plaintext) res = salt + crypted # print('salt=',salt) # print('cr=',crypted) return b64encode(res).decode('ascii') except: return ""
def _maybe_seed(self): if not self.seeded: try: seed = os.urandom(16) except: try: r = open('/dev/urandom', 'rb', 0) try: seed = r.read(16) finally: r.close() except: seed = str(time.time()) self.seeded = True seed = bytearray(seed) self.stir(seed, True)
def seed(self, a=None): """Initialize internal state from hashable object. None or no argument seeds from current time or from an operating system specific randomness source if available. If a is not None or an int or long, hash(a) is used instead. """ if a is None: try: # Seed with enough bytes to span the 19937 bit # state space for the Mersenne Twister a = long(_hexlify(_urandom(2500)), 16) except NotImplementedError: import time a = long(time.time() * 256) # use fractional seconds super(Random, self).seed(a) self.gauss_next = None
def touch(self, path, size=None, random=False, perm=None, time=None): """Simplify the dynamic creation of files or the updating of their modified time. If a size is specified, then a file of that size will be created on the disk. If the file already exists, then the size= attribute is ignored (for safey reasons). if random is set to true, then the file created is actually created using tons of randomly generated content. This is MUCH slower but nessisary for certain tests. """ path = abspath(path) if not isdir(dirname(path)): mkdir(dirname(path), 0700) if not exists(path): size = strsize_to_bytes(size) if not random: f = open(path, "wb") if isinstance(size, int) and size > 0: f.seek(size-1) f.write("\0") f.close() else: # fill our file with randomly generaed content with open(path, 'wb') as f: # Fill our file with garbage f.write(urandom(size)) # Update our path utime(path, time) if perm is not None: # Adjust permissions chmod(path, perm) # Return True return True
def read_segments(segments, metadata): for segment in segments: yield manifest_structure.Segment( metadata.backupset_id, metadata.incremental, segment, 0, 0, os.urandom(20) )
def initialize(self): with open(self.manifest_file, 'wb', 4096) as f: f.write(os.urandom(20))
def dsa_test(): import os msg = str(random.randint(q,q+q)).encode('utf-8') sk = os.urandom(32) pk = publickey(sk) sig = signature(msg, sk, pk) return checkvalid(sig, msg, pk)
def crypto_sign_keypair(seed=None): """Return (verifying, secret) key from a given seed, or os.urandom(32)""" if seed is None: seed = os.urandom(PUBLICKEYBYTES) else: warnings.warn("ed25519ll should choose random seed.", RuntimeWarning) if len(seed) != 32: raise ValueError("seed must be 32 random bytes or None.") skbytes = seed vkbytes = djbec.publickey(skbytes) return Keypair(vkbytes, skbytes+vkbytes)
def random(self): """Get the next random number in the range [0.0, 1.0).""" return (int.from_bytes(_urandom(7), 'big') >> 3) * RECIP_BPF
def getrandbits(self, k): """getrandbits(k) -> x. Generates an int with k random bits.""" if k <= 0: raise ValueError('number of bits must be greater than zero') if k != int(k): raise TypeError('number of bits should be an integer') numbytes = (k + 7) // 8 # bits / 8 and rounded up x = int.from_bytes(_urandom(numbytes), 'big') return x >> (numbytes * 8 - k) # trim excess bits
def generate_signature(signature_plain, signature_lib): iv = os.urandom(32) output_size = ctypes.c_size_t() signature_lib.encrypt(signature_plain, len(signature_plain), iv, 32, None, ctypes.byref(output_size)) output = (ctypes.c_ubyte * output_size.value)() signature_lib.encrypt(signature_plain, len(signature_plain), iv, 32, ctypes.byref(output), ctypes.byref(output_size)) signature = b''.join(list(map(lambda x: six.int2byte(x), output))) return signature
def start(self): self.logger.info("starting web server listening at https port {0}".format(self.httpsPort)) dir = os.path.dirname(os.path.realpath(__file__)) handlersArgs = dict(iotManager=self.iotManager) application = [ (r'/(favicon.ico)', tornado.web.StaticFileHandler, {'path': dir + '/img'}), (r'/static/(.*)', tornado.web.StaticFileHandler, {'path': dir + '/static'}), (r'/upload/(.*)', handlers.AuthFileHandler, {'path': self.uploadDir}), (r'/img/(.*)', tornado.web.StaticFileHandler, {'path': dir + '/img'}), (r'/login', handlers.LoginWebHandler, dict(adminPasswordHash=self.adminPasswordHash)), (r'/logout', handlers.LogoutWebHandler), (r'/ws', handlers.WSHandler, handlersArgs), (r'/device/(.*)', handlers.DeviceWebHandler, handlersArgs), (r'/rss', handlers.RssWebHandler, handlersArgs), (r'/history', handlers.HistoryWebHandler, handlersArgs), (r'/devices', handlers.DevicesWebHandler, handlersArgs), (r'/video', handlers.VideoWebHandler, dict(localVideoPort=self.localVideoPort)), (r'/', handlers.HomeWebHandler, handlersArgs), ] self.logger.info("starting web server listening at http {0} (plain)".format(self.httpPort)) self.httpsApp = tornado.web.Application(application, cookie_secret=os.urandom(32), compiled_template_cache=True) sslOptions={ "certfile": self.httpsCertFile, "keyfile": self.httpsKeyFile, "ssl_version": ssl.PROTOCOL_TLSv1 } if self.httpsChainFile: sslOptions["certfile"] = self.httpsChainFile self.logger.info("Using certificate file at {0}".format(sslOptions["certfile"])) self.httpsServer = tornado.httpserver.HTTPServer(self.httpsApp, ssl_options=sslOptions) self.httpsServer.listen(self.httpsPort) httpApplication = [ (r'/rss', handlers.RssWebHandler, handlersArgs), (r'/', handlers.RedirectorHandler, dict(manager = self)), (r'/(.*)', tornado.web.StaticFileHandler, {'path': dir + '/plain' }) ] self.httpApp = tornado.web.Application(httpApplication) self.httpServer = tornado.httpserver.HTTPServer(self.httpApp) self.httpServer.listen(self.httpPort) tornado.ioloop.IOLoop.current().start()
def _temporary_keychain(): """ This function creates a temporary Mac keychain that we can use to work with credentials. This keychain uses a one-time password and a temporary file to store the data. We expect to have one keychain per socket. The returned SecKeychainRef must be freed by the caller, including calling SecKeychainDelete. Returns a tuple of the SecKeychainRef and the path to the temporary directory that contains it. """ # Unfortunately, SecKeychainCreate requires a path to a keychain. This # means we cannot use mkstemp to use a generic temporary file. Instead, # we're going to create a temporary directory and a filename to use there. # This filename will be 8 random bytes expanded into base64. We also need # some random bytes to password-protect the keychain we're creating, so we # ask for 40 random bytes. random_bytes = os.urandom(40) filename = base64.b64encode(random_bytes[:8]).decode('utf-8') password = base64.b64encode(random_bytes[8:]) # Must be valid UTF-8 tempdirectory = tempfile.mkdtemp() keychain_path = os.path.join(tempdirectory, filename).encode('utf-8') # We now want to create the keychain itself. keychain = Security.SecKeychainRef() status = Security.SecKeychainCreate( keychain_path, len(password), password, False, None, ctypes.byref(keychain) ) _assert_no_error(status) # Having created the keychain, we want to pass it off to the caller. return keychain, tempdirectory
def dsa_test(): import os msg = str(random.randint(q, q + q)).encode('utf-8') sk = os.urandom(32) pk = publickey(sk) sig = signature(msg, sk, pk) return checkvalid(sig, msg, pk)