Python os 模块,urandom() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.urandom()

项目:PGO-mapscan-opt    作者:seikur0    | 项目源码 | 文件源码
def new_session(account):
    if account.get('session',None) is None:
        session = requests.session()

        session.verify = True
        session.headers.update({'User-Agent': 'Niantic App'})  # session.headers.update({'User-Agent': 'niantic'})
        if not account['proxy'] is None:
            session.proxies.update(account['proxy'])
        account['session'] = session
    else:
        account['session'].close()
        account['session'].cookies.clear()

    account['session_time'] = get_time()
    account['session_hash'] = os.urandom(32)

    account['api_url'] = API_URL
    account['auth_ticket'] = None
项目:transfert    作者:rbernand    | 项目源码 | 文件源码
def test_copy(tmpdir, storages):
    f = tmpdir.join('alpha')
    f.write_binary(os.urandom(1024 * 40))
    f_http = Resource(storages['http']('index.html'))
    f_file = Resource(storages['file'](f.strpath))
    f_sftp = Resource(storages['sftp']('/gamma'))
    f_ftp = Resource(storages['ftp']('/beta'))
    assert f_http.exists()
    delete_files(f_file, f_sftp, f_ftp)
    assert not f_file.exists()\
        and not f_sftp.exists()\
        and not f_ftp.exists()
    transfert.actions.copy(f_http, f_ftp, size=40960)
    assert f_ftp.exists() and f_http.exists()
    transfert.actions.copy(f_ftp, f_sftp, size=40960)
    assert f_ftp.exists() and f_sftp.exists()
    transfert.actions.copy(f_sftp, f_file, size=40960)
    assert f_sftp.exists() and f_file.exists()
项目:BitBot    作者:crack00r    | 项目源码 | 文件源码
def encrypt(self, data, offset=None, length=None):
        """Encrypts the given data with the current key"""
        if offset is None:
            offset = 0
        if length is None:
            length = len(data)

        with BinaryWriter() as writer:
            # Write SHA
            writer.write(sha1(data[offset:offset + length]).digest())
            # Write data
            writer.write(data[offset:offset + length])
            # Add padding if required
            if length < 235:
                writer.write(os.urandom(235 - length))

            result = int.from_bytes(writer.get_bytes(), byteorder='big')
            result = pow(result, self.e, self.m)

            # If the result byte count is less than 256, since the byte order is big,
            # the non-used bytes on the left will be 0 and act as padding,
            # without need of any additional checks
            return int.to_bytes(
                result, length=256, byteorder='big', signed=False)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def seed(self, a=None):
        """Initialize internal state from hashable object.

        None or no argument seeds from current time or from an operating
        system specific randomness source if available.

        If a is not None or an int or long, hash(a) is used instead.
        """

        if a is None:
            try:
                a = long(_hexlify(_urandom(16)), 16)
            except NotImplementedError:
                import time
                a = long(time.time() * 256) # use fractional seconds

        super(Random, self).seed(a)
        self.gauss_next = None
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def uuid4():
    """Generate a random UUID."""

    # When the system provides a version-4 UUID generator, use it.
    if _uuid_generate_random:
        _buffer = ctypes.create_string_buffer(16)
        _uuid_generate_random(_buffer)
        return UUID(bytes=_buffer.raw)

    # Otherwise, get randomness from urandom or the 'random' module.
    try:
        import os
        return UUID(bytes=os.urandom(16), version=4)
    except:
        import random
        bytes = [chr(random.randrange(256)) for i in range(16)]
        return UUID(bytes=bytes, version=4)
项目:py-secretcrypt    作者:Zemanta    | 项目源码 | 文件源码
def _key():
    global __key
    if __key:
        return __key

    data_dir = _key_dir()
    key_file = os.path.join(data_dir, 'key')

    if os.path.isfile(key_file):
        with open(key_file, 'rb') as f:
            __key = base64.b64decode(f.read())
            return __key

    __key = base64.b64encode(os.urandom(16))
    try:
        os.makedirs(data_dir)
    except OSError as e:
        # errno17 == dir exists
        if e.errno != 17:
            raise
    with open(key_file, 'wb') as f:
        f.write(__key)
    return __key
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def yandex(url):
    try:
        cookie = client.request(url, output='cookie')

        r = client.request(url, cookie=cookie)
        r = re.sub(r'[^\x00-\x7F]+', ' ', r)

        sk = re.findall('"sk"\s*:\s*"([^"]+)', r)[0]

        idstring = re.findall('"id"\s*:\s*"([^"]+)', r)[0]

        idclient = binascii.b2a_hex(os.urandom(16))

        post = {'idClient': idclient, 'version': '3.9.2', 'sk': sk, '_model.0': 'do-get-resource-url', 'id.0': idstring}
        post = urllib.urlencode(post)

        r = client.request('https://yadi.sk/models/?_m=do-get-resource-url', post=post, cookie=cookie)
        r = json.loads(r)

        url = r['models'][0]['data']['file']

        return url
    except:
        return
项目:PyPPSPP    作者:justas-    | 项目源码 | 文件源码
def create_file(path, size):
    """Create a file with random contents and a given size"""

    print('Creating file: {} having a size of {} Bytes'.format(path, size))

    t_start = time.time()

    with open(path, mode='wb') as fp:

        # Write integer number of blocks
        data_left = size
        for _ in range(size // DATA_BLOCK):
            fp.write(os.urandom(DATA_BLOCK))
            data_left -= DATA_BLOCK

        # Write the remainder (if any)
        if data_left > 0:
            fp.write(os.urandom(data_left))

    print('File created in {} sec.'.format(time.time()-t_start))
项目:dnsbrute    作者:XiphosResearch    | 项目源码 | 文件源码
def check_for_wildcards(args, server, name, rectype, tries=4):
    """
    Verify that the DNS server doesn't return wildcard results for domains
    which don't exist, it should correctly return NXDOMAIN.
    """
    resolver = Resolver()
    resolver.timeout = args.timeout
    resolver.lifetime = args.timeout
    resolver.nameservers = [server]
    nx_names = [base64.b32encode(
                    os.urandom(
                        random.randint(8, 10))
                ).strip('=').lower() + name
                for _ in range(0, tries)]
    correct_result_count = 0
    for check_nx_name in nx_names:
        try:
            result = resolver.query(check_nx_name, rectype)            
            return False  # Any valid response = immediate fail!
        except (NXDOMAIN, NoNameservers):
            correct_result_count += 1
        except DNSException:
            continue        
    return correct_result_count > (tries / 2.0)
项目:pogom-linux    作者:PokeHunterProject    | 项目源码 | 文件源码
def __init__(self, auth_provider, device_info=None):

        self.log = logging.getLogger(__name__)

        self._auth_provider = auth_provider

        # mystical unknown6 - resolved by PokemonGoDev
        self._signal_agglom_gen = False
        self._signature_lib = None

        if RpcApi.START_TIME == 0:
            RpcApi.START_TIME = get_time(ms=True)

        if RpcApi.RPC_ID == 0:
            RpcApi.RPC_ID = int(random.random() * 10 ** 18)
            self.log.debug('Generated new random RPC Request id: %s', RpcApi.RPC_ID)

        # data fields for unknown6
        self.session_hash = os.urandom(32)
        self.token2 = random.randint(1,59)
        self.course = random.uniform(0, 360)

        self.device_info = device_info
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def read_random_bits(nbits):
    '''Reads 'nbits' random bits.

    If nbits isn't a whole number of bytes, an extra byte will be appended with
    only the lower bits set.
    '''

    nbytes, rbits = divmod(nbits, 8)

    # Get the random bytes
    randomdata = os.urandom(nbytes)

    # Add the remaining random bits
    if rbits > 0:
        randomvalue = ord(os.urandom(1))
        randomvalue >>= (8 - rbits)
        randomdata = byte(randomvalue) + randomdata

    return randomdata
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def _make_flow(request, scopes, return_url=None):
    """Creates a Web Server Flow"""
    # Generate a CSRF token to prevent malicious requests.
    csrf_token = hashlib.sha256(os.urandom(1024)).hexdigest()

    request.session[_CSRF_KEY] = csrf_token

    state = json.dumps({
        'csrf_token': csrf_token,
        'return_url': return_url,
    })

    flow = client.OAuth2WebServerFlow(
        client_id=django_util.oauth2_settings.client_id,
        client_secret=django_util.oauth2_settings.client_secret,
        scope=scopes,
        state=state,
        redirect_uri=request.build_absolute_uri(
            urlresolvers.reverse("google_oauth:callback")))

    flow_key = _FLOW_KEY.format(csrf_token)
    request.session[flow_key] = pickle.dumps(flow)
    return flow
项目:anonymine    作者:oskar-skog    | 项目源码 | 文件源码
def init_field(self, startpoint):
        '''Place the mines and reveal the starting point.

        Internal details:
            It will wrap in `init_field2` in guessless mode.
            It will place the mines by itself when not in guessless
            mode.
        '''
        if self.guessless:
            # Wrap in the best version.
            self.init_field2(startpoint)
        else:
            safe = self.field.get_neighbours(startpoint) + [startpoint]
            cells = list(filter(
                lambda x: x not in safe,
                self.field.all_cells()
            ))
            # Choose self.n_mines randomly selected mines.
            cells.sort(key=lambda x: os.urandom(1))
            mines = cells[:self.n_mines]
            self.field.clear()
            self.field.fill(mines)
            self.field.reveal(startpoint)
项目:astrobase    作者:waqasbhatti    | 项目源码 | 文件源码
def newcursor(self, dictcursor=False):
        '''
        This creates a DB cursor for the current DB connection using a
        randomly generated handle. Returns a tuple with cursor and handle.

        dictcursor = True -> use a cursor where each returned row can be
                             addressed as a dictionary by column name

        '''

        handle = hashlib.sha256(os.urandom(12)).hexdigest()

        if dictcursor:
            self.cursors[handle] = self.connection.cursor(
                cursor_factory=psycopg2.extras.DictCursor
                )
        else:
            self.cursors[handle] = self.connection.cursor()

            return (self.cursors[handle], handle)
项目:DeepSea    作者:SUSE    | 项目源码 | 文件源码
def cluster_config(self):
        """
        Provide the default configuration for a cluster
        """
        if self.cluster:
            cluster_dir = "{}/config/stack/default/{}".format(self.root_dir, self.cluster)
            if not os.path.isdir(cluster_dir):
                _create_dirs(cluster_dir, self.root_dir)
            filename = "{}/cluster.yml".format(cluster_dir)
            contents = {}
            contents['fsid'] = str(uuid.uuid3(uuid.NAMESPACE_DNS, os.urandom(32)))

            public_networks_str = ", ".join([str(n) for n in self.public_networks])
            cluster_networks_str = ", ".join([str(n) for n in self.cluster_networks])

            contents['public_network'] = public_networks_str
            contents['cluster_network'] = cluster_networks_str
            contents['available_roles'] = self.available_roles

            self.writer.write(filename, contents)
项目:storjspec    作者:StorjRND    | 项目源码 | 文件源码
def test_send_receive(self):
        random.shuffle(self.swarm)
        senders = self.swarm[:len(self.swarm)/2]
        receivers = self.swarm[len(self.swarm)/2:]
        for sender, receiver in zip(senders, receivers):
            message = binascii.hexlify(os.urandom(64))

            # check queue previously empty
            self.assertFalse(bool(receiver.message_list()))

            # send message
            self.assertTrue(sender.message_send(receiver.dht_id(), message))

            # check received
            received = receiver.message_list()
            self.assertTrue(sender.dht_id() in received)
            messages = received[sender.dht_id()]
            self.assertTrue(len(messages) == 1)
            self.assertEqual(messages[0], message)

            # check queue empty after call to message_list
            self.assertFalse(bool(receiver.message_list()))
项目:storjspec    作者:StorjRND    | 项目源码 | 文件源码
def test_ordering(self):
        random.shuffle(self.swarm)
        sender = self.swarm[0]
        receiver = self.swarm[-1]

        # send messages
        message_alpha = binascii.hexlify(os.urandom(64))
        message_beta = binascii.hexlify(os.urandom(64))
        message_gamma = binascii.hexlify(os.urandom(64))
        self.assertTrue(sender.message_send(receiver.dht_id(), message_alpha))
        self.assertTrue(sender.message_send(receiver.dht_id(), message_beta))
        self.assertTrue(sender.message_send(receiver.dht_id(), message_gamma))

        # check received in order
        received = receiver.message_list()
        self.assertTrue(sender.dht_id() in received)
        messages = received[sender.dht_id()]
        self.assertEqual(messages[0], message_alpha)
        self.assertEqual(messages[1], message_beta)
        self.assertEqual(messages[2], message_gamma)
项目:storjspec    作者:StorjRND    | 项目源码 | 文件源码
def test_flood(self):

        # every node subscribes and should receive  the event
        topic = "test_flood_{0}".format(binascii.hexlify(os.urandom(32)))
        for peer in self.swarm:
            peer.pubsub_subscribe(topic)

        # wait until subscriptions propagate
        time.sleep(SLEEP_TIME)

        # send event
        peer = random.choice(self.swarm)
        event = binascii.hexlify(os.urandom(32))
        peer.pubsub_publish(topic, event)

        # wait until event propagates
        time.sleep(SLEEP_TIME)

        # check all peers received the event
        for peer in self.swarm:
            events = peer.pubsub_events(topic)
            self.assertEqual(events, [event])
项目:storjspec    作者:StorjRND    | 项目源码 | 文件源码
def test_multihop(self):
        random.shuffle(self.swarm)
        senders = self.swarm[:len(self.swarm) / 2]
        receivers = self.swarm[len(self.swarm) / 2:]
        for sender, receiver in zip(senders, receivers):

            # receiver subscribes to topic
            topic = "test_miltihop_{0}".format(binascii.hexlify(os.urandom(32)))
            receiver.pubsub_subscribe(topic)

            # wait until subscriptions propagate
            time.sleep(SLEEP_TIME)

            # send event
            event = binascii.hexlify(os.urandom(32))
            sender.pubsub_publish(topic, event)

            # wait until event propagates
            time.sleep(SLEEP_TIME)

            # check all peers received the event
            events = receiver.pubsub_events(topic)
            self.assertEqual(events, [event])
项目:shadowsocksR-b    作者:hao35954514    | 项目源码 | 文件源码
def server_udp_pre_encrypt(self, buf, uid):
        if uid in self.server_info.users:
            user_key = self.server_info.users[uid]
        else:
            uid = None
            if not self.server_info.users:
                user_key = self.server_info.key
            else:
                user_key = self.server_info.recv_iv
        authdata = os.urandom(7)
        mac_key = self.server_info.key
        md5data = hmac.new(mac_key, authdata, self.hashfunc).digest()
        rand_len = self.udp_rnd_data_len(md5data, self.random_server)
        encryptor = encrypt.Encryptor(to_bytes(base64.b64encode(user_key)) + to_bytes(base64.b64encode(md5data)), 'rc4')
        out_buf = encryptor.encrypt(buf)
        buf = out_buf + os.urandom(rand_len) + authdata
        return buf + hmac.new(user_key, buf, self.hashfunc).digest()[:1]
项目:shadowsocksR-b    作者:hao35954514    | 项目源码 | 文件源码
def __init__(self, method, hashfunc):
        super(auth_aes128_sha1, self).__init__(method)
        self.hashfunc = hashfunc
        self.recv_buf = b''
        self.unit_len = 8100
        self.raw_trans = False
        self.has_sent_header = False
        self.has_recv_header = False
        self.client_id = 0
        self.connection_id = 0
        self.max_time_dif = 60 * 60 * 24 # time dif (second) setting
        self.salt = hashfunc == hashlib.md5 and b"auth_aes128_md5" or b"auth_aes128_sha1"
        self.no_compatible_method = hashfunc == hashlib.md5 and "auth_aes128_md5" or 'auth_aes128_sha1'
        self.extra_wait_size = struct.unpack('>H', os.urandom(2))[0] % 1024
        self.pack_id = 1
        self.recv_id = 1
        self.user_id = None
        self.user_key = None
        self.last_rnd_len = 0
        self.overhead = 9
项目:privcount    作者:privcount    | 项目源码 | 文件源码
def writeConfiguredCookieFile(self, cookie_string = None):
        '''
        Write a random 32-byte value to the configured cookie file.
        If cookie_string is not None, use that value.
        Return the value written to the file, or None if there is no cookie
        file, or if writing the file fails.
        '''
        cookie_file = self.getConfiguredCookieFile()
        if cookie_file is not None:
            if cookie_string is None:
                cookie_string = urandom(TorControlProtocol.SAFECOOKIE_LENGTH)
            try:
                with open(cookie_file, 'w') as f:
                    f.write(cookie_string)
            except IOError as e:
                logging.warning("Disabling SAFECOOKIE authentication, writing cookie file '{}' failed with error: {}"
                                .format(cookie_file, e))
                return None
            # sanity check: this will fail in write-only environments
            assert cookie_string == TorControlProtocol.readCookieFile(
                cookie_file)
            return cookie_string
        else:
            return None
项目:zmirror    作者:aploium    | 项目源码 | 文件源码
def test_io_and_many_files(self):
        import os
        from time import time

        start = time()
        for i in range(10000):  # ?????????????
            obj = os.urandom(16 * 1024)
            self.cache.put_obj(i, obj, info_dict={"bin": obj})
        for i in range(10000):
            info = self.cache.get_info(i)
            obj = self.cache.get_obj(i)
            self.assertEqual(info['bin'], obj)
        print("test_io_and_many_files IO total time:", time() - start)

        # test clean delete
        all_cache_file_path = [v[0] for v in self.cache.items_dict.values()]
        start = time()
        del self.cache
        print("test_io_and_many_files DELETE ALL total time:", time() - start)
        for path in all_cache_file_path:
            self.assertFalse(os.path.exists(path), msg=path)
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def test_base64io_encode_file(tmpdir):
    source_plaintext = os.urandom(1024 * 1024)
    plaintext_b64 = base64.b64encode(source_plaintext)
    plaintext = tmpdir.join('plaintext')
    b64_plaintext = tmpdir.join('base64_plaintext')

    with open(str(plaintext), 'wb') as file:
        file.write(source_plaintext)

    with open(str(plaintext), 'rb') as source, open(str(b64_plaintext), 'wb') as target:
        with Base64IO(target) as encoder:
            for chunk in source:
                encoder.write(chunk)

    with open(str(b64_plaintext), 'rb') as file2:
        encoded = file2.read()

    assert encoded == plaintext_b64
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def test_encrypt_with_metadata_output_write_to_file(tmpdir):
    plaintext = tmpdir.join('source_plaintext')
    plaintext.write_binary(os.urandom(1024))
    ciphertext = tmpdir.join('ciphertext')
    metadata = tmpdir.join('metadata')

    encrypt_args = encrypt_args_template(metadata=True).format(
        source=str(plaintext),
        target=str(ciphertext),
        metadata='--metadata-output ' + str(metadata)
    )

    aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows()))

    raw_metadata = metadata.read()
    output_metadata = json.loads(raw_metadata)
    for key, value in (('a', 'b'), ('c', 'd')):
        assert output_metadata['header']['encryption_context'][key] == value
    assert output_metadata['mode'] == 'encrypt'
    assert output_metadata['input'] == str(plaintext)
    assert output_metadata['output'] == str(ciphertext)
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def test_encrypt_with_metadata_full_file_path(tmpdir):
    plaintext_filename = 'source_plaintext'
    plaintext_file = tmpdir.join(plaintext_filename)
    plaintext_file.write_binary(os.urandom(1024))
    plaintext_file_full_path = str(plaintext_file)
    ciphertext_filename = 'ciphertext'
    ciphertext_file = tmpdir.join(ciphertext_filename)
    ciphertext_file_full_path = str(ciphertext_file)
    metadata = tmpdir.join('metadata')

    encrypt_args = encrypt_args_template(metadata=True).format(
        source=plaintext_filename,
        target=ciphertext_filename,
        metadata='--metadata-output ' + str(metadata)
    )

    with tmpdir.as_cwd():
        aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows()))

    raw_metadata = metadata.read()
    output_metadata = json.loads(raw_metadata)
    assert output_metadata['input'] == plaintext_file_full_path
    assert output_metadata['output'] == ciphertext_file_full_path
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def test_encrypt_with_metadata_output_write_to_stdout(tmpdir, capsys):
    plaintext = tmpdir.join('source_plaintext')
    plaintext.write_binary(os.urandom(1024))
    ciphertext = tmpdir.join('ciphertext')

    encrypt_args = encrypt_args_template(metadata=True).format(
        source=str(plaintext),
        target=str(ciphertext),
        metadata='--metadata-output -'
    )

    aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows()))

    out, _err = capsys.readouterr()
    output_metadata = json.loads(out)
    for key, value in (('a', 'b'), ('c', 'd')):
        assert output_metadata['header']['encryption_context'][key] == value
    assert output_metadata['mode'] == 'encrypt'
    assert output_metadata['input'] == str(plaintext)
    assert output_metadata['output'] == str(ciphertext)
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def test_file_to_file_decrypt_required_encryption_context_success(tmpdir, required_encryption_context):
    plaintext = tmpdir.join('source_plaintext')
    ciphertext = tmpdir.join('ciphertext')
    decrypted = tmpdir.join('decrypted')
    with open(str(plaintext), 'wb') as f:
        f.write(os.urandom(1024))

    encrypt_args = encrypt_args_template().format(
        source=str(plaintext),
        target=str(ciphertext)
    )
    decrypt_args = decrypt_args_template().format(
        source=str(ciphertext),
        target=str(decrypted)
    ) + ' --encryption-context ' + required_encryption_context

    aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows()))
    aws_encryption_sdk_cli.cli(shlex.split(decrypt_args, posix=not is_windows()))

    assert filecmp.cmp(str(plaintext), str(decrypted))
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def test_file_to_file_decrypt_required_encryption_context_fail(tmpdir, required_encryption_context):
    plaintext = tmpdir.join('source_plaintext')
    plaintext.write_binary(os.urandom(1024))
    ciphertext = tmpdir.join('ciphertext')
    metadata_file = tmpdir.join('metadata')
    decrypted = tmpdir.join('decrypted')

    encrypt_args = encrypt_args_template().format(
        source=str(plaintext),
        target=str(ciphertext)
    )
    decrypt_args = decrypt_args_template(metadata=True).format(
        source=str(ciphertext),
        target=str(decrypted),
        metadata=' --metadata-output ' + str(metadata_file)
    ) + ' --encryption-context ' + required_encryption_context

    aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows()))
    aws_encryption_sdk_cli.cli(shlex.split(decrypt_args, posix=not is_windows()))

    assert not decrypted.isfile()
    raw_metadata = metadata_file.read()
    parsed_metadata = json.loads(raw_metadata)
    assert parsed_metadata['skipped']
    assert parsed_metadata['reason'] == 'Missing encryption context key or value'
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def test_file_to_file_cycle(tmpdir):
    plaintext = tmpdir.join('source_plaintext')
    ciphertext = tmpdir.join('ciphertext')
    decrypted = tmpdir.join('decrypted')
    with open(str(plaintext), 'wb') as f:
        f.write(os.urandom(1024))

    encrypt_args = encrypt_args_template().format(
        source=str(plaintext),
        target=str(ciphertext)
    )
    decrypt_args = decrypt_args_template().format(
        source=str(ciphertext),
        target=str(decrypted)
    )

    aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows()))
    aws_encryption_sdk_cli.cli(shlex.split(decrypt_args, posix=not is_windows()))

    assert filecmp.cmp(str(plaintext), str(decrypted))
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def test_file_to_file_cycle_target_through_symlink(tmpdir):
    plaintext = tmpdir.join('source_plaintext')
    output_dir = tmpdir.mkdir('output')
    os.symlink(str(output_dir), str(tmpdir.join('output_link')))
    ciphertext = tmpdir.join('output_link', 'ciphertext')
    decrypted = tmpdir.join('decrypted')
    with open(str(plaintext), 'wb') as f:
        f.write(os.urandom(1024))

    encrypt_args = encrypt_args_template().format(
        source=str(plaintext),
        target=str(ciphertext)
    )
    decrypt_args = decrypt_args_template().format(
        source=str(ciphertext),
        target=str(decrypted)
    )

    aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows()))
    aws_encryption_sdk_cli.cli(shlex.split(decrypt_args, posix=not is_windows()))

    assert filecmp.cmp(str(plaintext), str(decrypted))
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def test_file_overwrite_source_file_to_file_custom_empty_prefix(tmpdir):
    plaintext_source = os.urandom(2014)
    plaintext = tmpdir.join('source_plaintext')
    with open(str(plaintext), 'wb') as f:
        f.write(plaintext_source)

    encrypt_args = encrypt_args_template().format(
        source=str(plaintext),
        target=str(plaintext)
    ) + ' --suffix'

    test_result = aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows()))

    assert test_result == 'Destination and source cannot be the same'

    with open(str(plaintext), 'rb') as f:
        assert f.read() == plaintext_source
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def test_file_overwrite_source_dir_to_dir_custom_empty_prefix(tmpdir):
    plaintext_source = os.urandom(2014)
    plaintext = tmpdir.join('source_plaintext')
    with open(str(plaintext), 'wb') as f:
        f.write(plaintext_source)

    encrypt_args = encrypt_args_template().format(
        source=str(tmpdir),
        target=str(tmpdir)
    ) + ' --suffix'

    test_result = aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows()))

    assert test_result == 'Destination and source cannot be the same'

    with open(str(plaintext), 'rb') as f:
        assert f.read() == plaintext_source
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def test_file_overwrite_source_file_to_dir_custom_empty_prefix(tmpdir):
    plaintext_source = os.urandom(2014)
    plaintext = tmpdir.join('source_plaintext')
    with open(str(plaintext), 'wb') as f:
        f.write(plaintext_source)

    encrypt_args = encrypt_args_template().format(
        source=str(plaintext),
        target=str(tmpdir)
    ) + ' --suffix'

    test_result = aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows()))

    assert test_result is None

    with open(str(plaintext), 'rb') as f:
        assert f.read() == plaintext_source
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def test_file_to_dir_cycle(tmpdir):
    inner_dir = tmpdir.mkdir('inner')
    plaintext = tmpdir.join('source_plaintext')
    ciphertext = inner_dir.join('source_plaintext.encrypted')
    decrypted = tmpdir.join('decrypted')
    with open(str(plaintext), 'wb') as f:
        f.write(os.urandom(1024))

    encrypt_args = encrypt_args_template().format(
        source=str(plaintext),
        target=str(inner_dir)
    )
    decrypt_args = decrypt_args_template().format(
        source=str(ciphertext),
        target=str(decrypted)
    )

    aws_encryption_sdk_cli.cli(shlex.split(encrypt_args, posix=not is_windows()))
    assert os.path.isfile(str(ciphertext))
    aws_encryption_sdk_cli.cli(shlex.split(decrypt_args, posix=not is_windows()))

    assert filecmp.cmp(str(plaintext), str(decrypted))
项目:aws-encryption-sdk-cli    作者:awslabs    | 项目源码 | 文件源码
def test_stdin_to_file_to_stdout_cycle(tmpdir):
    ciphertext_file = tmpdir.join('ciphertext')
    plaintext = os.urandom(1024)

    encrypt_args = 'aws-encryption-cli ' + encrypt_args_template(decode=True).format(
        source='-',
        target=str(ciphertext_file)
    )
    decrypt_args = 'aws-encryption-cli ' + decrypt_args_template(encode=True).format(
        source=str(ciphertext_file),
        target='-'
    )

    proc = Popen(shlex.split(encrypt_args, posix=not is_windows()), stdout=PIPE, stdin=PIPE, stderr=PIPE)
    _stdout, _stderr = proc.communicate(input=base64.b64encode(plaintext))

    proc = Popen(shlex.split(decrypt_args, posix=not is_windows()), stdout=PIPE, stdin=PIPE, stderr=PIPE)
    decrypted_stdout, _stderr = proc.communicate()

    assert base64.b64decode(decrypted_stdout) == plaintext
项目:djangoStatusPanel    作者:okar1    | 项目源码 | 文件源码
def encryptPassword(plaintext):
        try:
            letters = 'qwertyuiopasdfghjklzxcvbnmQWERTYUIOPASDFGHJKLZXCVBNM1234567890????????????????????????????????????????????????????????????????'
            SALT_SIZE = 8
            salt = urandom(SALT_SIZE)
            randomText = ''.join([random.choice(letters) for i in range(8)])
            plaintext = "%3d" % len(plaintext) + plaintext + randomText
            plaintext = plaintext.encode('utf-8')
            arc4 = ARC4.new(salt)
            crypted = arc4.encrypt(plaintext)
            res = salt + crypted
            # print('salt=',salt)
            # print('cr=',crypted)
            return b64encode(res).decode('ascii')
        except:
            return ""
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _maybe_seed(self):
        if not self.seeded:
            try:
                seed = os.urandom(16)
            except:
                try:
                    r = open('/dev/urandom', 'rb', 0)
                    try:
                        seed = r.read(16)
                    finally:
                        r.close()
                except:
                    seed = str(time.time())
            self.seeded = True
            seed = bytearray(seed)
            self.stir(seed, True)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def seed(self, a=None):
        """Initialize internal state from hashable object.

        None or no argument seeds from current time or from an operating
        system specific randomness source if available.

        If a is not None or an int or long, hash(a) is used instead.
        """

        if a is None:
            try:
                # Seed with enough bytes to span the 19937 bit
                # state space for the Mersenne Twister
                a = long(_hexlify(_urandom(2500)), 16)
            except NotImplementedError:
                import time
                a = long(time.time() * 256) # use fractional seconds

        super(Random, self).seed(a)
        self.gauss_next = None
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def touch(self, path, size=None, random=False, perm=None, time=None):
        """Simplify the dynamic creation of files or the updating of their
        modified time.  If a size is specified, then a file of that size
        will be created on the disk. If the file already exists, then the
        size= attribute is ignored (for safey reasons).

        if random is set to true, then the file created is actually
        created using tons of randomly generated content.  This is MUCH
        slower but nessisary for certain tests.

        """

        path = abspath(path)
        if not isdir(dirname(path)):
            mkdir(dirname(path), 0700)

        if not exists(path):
            size = strsize_to_bytes(size)

            if not random:
                f = open(path, "wb")
                if isinstance(size, int) and size > 0:
                    f.seek(size-1)
                    f.write("\0")
                f.close()

            else: # fill our file with randomly generaed content
                with open(path, 'wb') as f:
                    # Fill our file with garbage
                    f.write(urandom(size))

        # Update our path
        utime(path, time)

        if perm is not None:
            # Adjust permissions
            chmod(path, perm)

        # Return True
        return True
项目:ekko    作者:openstack    | 项目源码 | 文件源码
def read_segments(segments, metadata):
    for segment in segments:
        yield manifest_structure.Segment(
            metadata.backupset_id,
            metadata.incremental,
            segment,
            0,
            0,
            os.urandom(20)
        )
项目:ekko    作者:openstack    | 项目源码 | 文件源码
def initialize(self):
        with open(self.manifest_file, 'wb', 4096) as f:
            f.write(os.urandom(20))
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def dsa_test():
    import os
    msg = str(random.randint(q,q+q)).encode('utf-8')
    sk = os.urandom(32)
    pk = publickey(sk)
    sig = signature(msg, sk, pk)
    return checkvalid(sig, msg, pk)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def crypto_sign_keypair(seed=None):
    """Return (verifying, secret) key from a given seed, or os.urandom(32)"""    
    if seed is None:
        seed = os.urandom(PUBLICKEYBYTES)
    else:
        warnings.warn("ed25519ll should choose random seed.",
                      RuntimeWarning)
    if len(seed) != 32:
        raise ValueError("seed must be 32 random bytes or None.")
    skbytes = seed
    vkbytes = djbec.publickey(skbytes)
    return Keypair(vkbytes, skbytes+vkbytes)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def random(self):
        """Get the next random number in the range [0.0, 1.0)."""
        return (int.from_bytes(_urandom(7), 'big') >> 3) * RECIP_BPF
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def getrandbits(self, k):
        """getrandbits(k) -> x.  Generates an int with k random bits."""
        if k <= 0:
            raise ValueError('number of bits must be greater than zero')
        if k != int(k):
            raise TypeError('number of bits should be an integer')
        numbytes = (k + 7) // 8                       # bits / 8 and rounded up
        x = int.from_bytes(_urandom(numbytes), 'big')
        return x >> (numbytes * 8 - k)                # trim excess bits
项目:PGO-mapscan-opt    作者:seikur0    | 项目源码 | 文件源码
def generate_signature(signature_plain, signature_lib):
    iv = os.urandom(32)

    output_size = ctypes.c_size_t()

    signature_lib.encrypt(signature_plain, len(signature_plain), iv, 32, None, ctypes.byref(output_size))
    output = (ctypes.c_ubyte * output_size.value)()
    signature_lib.encrypt(signature_plain, len(signature_plain), iv, 32, ctypes.byref(output), ctypes.byref(output_size))
    signature = b''.join(list(map(lambda x: six.int2byte(x), output)))
    return signature
项目:IotCenter    作者:panjanek    | 项目源码 | 文件源码
def start(self):
        self.logger.info("starting web server listening at https port {0}".format(self.httpsPort))
        dir = os.path.dirname(os.path.realpath(__file__))
        handlersArgs = dict(iotManager=self.iotManager)

        application = [
        (r'/(favicon.ico)', tornado.web.StaticFileHandler, {'path': dir + '/img'}),
        (r'/static/(.*)', tornado.web.StaticFileHandler, {'path': dir + '/static'}),
        (r'/upload/(.*)', handlers.AuthFileHandler, {'path': self.uploadDir}),
        (r'/img/(.*)', tornado.web.StaticFileHandler, {'path': dir + '/img'}),
        (r'/login', handlers.LoginWebHandler, dict(adminPasswordHash=self.adminPasswordHash)),
        (r'/logout', handlers.LogoutWebHandler),
        (r'/ws', handlers.WSHandler, handlersArgs),
        (r'/device/(.*)', handlers.DeviceWebHandler, handlersArgs),
        (r'/rss', handlers.RssWebHandler, handlersArgs),
        (r'/history', handlers.HistoryWebHandler, handlersArgs),
        (r'/devices', handlers.DevicesWebHandler, handlersArgs),
        (r'/video', handlers.VideoWebHandler, dict(localVideoPort=self.localVideoPort)),
        (r'/', handlers.HomeWebHandler, handlersArgs),
        ]

        self.logger.info("starting web server listening at http {0} (plain)".format(self.httpPort))
        self.httpsApp = tornado.web.Application(application, cookie_secret=os.urandom(32), compiled_template_cache=True)
        sslOptions={ "certfile": self.httpsCertFile, "keyfile": self.httpsKeyFile, "ssl_version": ssl.PROTOCOL_TLSv1 }
        if self.httpsChainFile:
            sslOptions["certfile"] = self.httpsChainFile
        self.logger.info("Using certificate file at {0}".format(sslOptions["certfile"]))
        self.httpsServer = tornado.httpserver.HTTPServer(self.httpsApp, ssl_options=sslOptions)
        self.httpsServer.listen(self.httpsPort)

        httpApplication = [
            (r'/rss', handlers.RssWebHandler, handlersArgs),
            (r'/', handlers.RedirectorHandler, dict(manager = self)),
            (r'/(.*)', tornado.web.StaticFileHandler, {'path': dir + '/plain' })
        ]

        self.httpApp = tornado.web.Application(httpApplication)
        self.httpServer = tornado.httpserver.HTTPServer(self.httpApp)
        self.httpServer.listen(self.httpPort)        

        tornado.ioloop.IOLoop.current().start()
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def _temporary_keychain():
    """
    This function creates a temporary Mac keychain that we can use to work with
    credentials. This keychain uses a one-time password and a temporary file to
    store the data. We expect to have one keychain per socket. The returned
    SecKeychainRef must be freed by the caller, including calling
    SecKeychainDelete.

    Returns a tuple of the SecKeychainRef and the path to the temporary
    directory that contains it.
    """
    # Unfortunately, SecKeychainCreate requires a path to a keychain. This
    # means we cannot use mkstemp to use a generic temporary file. Instead,
    # we're going to create a temporary directory and a filename to use there.
    # This filename will be 8 random bytes expanded into base64. We also need
    # some random bytes to password-protect the keychain we're creating, so we
    # ask for 40 random bytes.
    random_bytes = os.urandom(40)
    filename = base64.b64encode(random_bytes[:8]).decode('utf-8')
    password = base64.b64encode(random_bytes[8:])  # Must be valid UTF-8
    tempdirectory = tempfile.mkdtemp()

    keychain_path = os.path.join(tempdirectory, filename).encode('utf-8')

    # We now want to create the keychain itself.
    keychain = Security.SecKeychainRef()
    status = Security.SecKeychainCreate(
        keychain_path,
        len(password),
        password,
        False,
        None,
        ctypes.byref(keychain)
    )
    _assert_no_error(status)

    # Having created the keychain, we want to pass it off to the caller.
    return keychain, tempdirectory
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def dsa_test():
    import os
    msg = str(random.randint(q, q + q)).encode('utf-8')
    sk = os.urandom(32)
    pk = publickey(sk)
    sig = signature(msg, sk, pk)
    return checkvalid(sig, msg, pk)