我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用Crypto.PublicKey.RSA.generate()。
def getHMACFunc(key, hex=True): """Return a function that computes the HMAC of its input using the **key**. :param bool hex: If True, the output of the function will be hex-encoded. :rtype: callable :returns: A function which can be uses to generate HMACs. """ h = hmac.new(key, digestmod=DIGESTMOD) def hmac_fn(value): h_tmp = h.copy() h_tmp.update(value) if hex: return h_tmp.hexdigest() else: return h_tmp.digest() return hmac_fn
def generate(bits, progress_func=None): """ Generate a new private RSA key. This factory function can be used to generate a new host key or authentication key. @param bits: number of bits the generated key should be. @type bits: int @param progress_func: an optional function to call at key points in key generation (used by C{pyCrypto.PublicKey}). @type progress_func: function @return: new private key @rtype: L{RSAKey} """ rsa = RSA.generate(bits, rng.read, progress_func) key = RSAKey(vals=(rsa.e, rsa.n)) key.d = rsa.d key.p = rsa.p key.q = rsa.q return key
def lambda_handler(event,context): try: if event['RequestType'] == 'Create': # Generate keys new_key = RSA.generate(2048) pub_key = new_key.publickey().exportKey(format='OpenSSH') priv_key = new_key.exportKey() # Encrypt private key kms = boto3.client('kms',region_name=event["ResourceProperties"]["Region"]) enc_key = kms.encrypt(KeyId=event["ResourceProperties"]["KMSKey"],Plaintext=priv_key)['CiphertextBlob'] f = open('/tmp/enc_key','wb') f.write(enc_key) f.close() # Upload priivate key to S3 s3 = boto3.client('s3') s3.upload_file('/tmp/enc_key',event["ResourceProperties"]["KeyBucket"],'enc_key') else: pub_key = event['PhysicalResourceId'] cfnresponse.send(event, context, cfnresponse.SUCCESS, {}, pub_key) except: traceback.print_exc() cfnresponse.send(event, context, cfnresponse.FAILED, {}, '')
def generate_key(self, key_size=2048): """ Parameters ---------- key_size : int bits in RSA key Returns ------- """ if key_size % 256 != 0: raise ValueError('RSA key size must be divisible by 256') self.key = RSA.generate(key_size) # RSA key self.RSAcipher = PKCS1_OAEP.new(self.key) # salts/pads things to be encrypted
def generate(bits, progress_func=None): """ Generate a new private RSA key. This factory function can be used to generate a new host key or authentication key. :param int bits: number of bits the generated key should be. :param function progress_func: an optional function to call at key points in key generation (used by ``pyCrypto.PublicKey``). :return: new `.RSAKey` private key """ rsa = RSA.generate(bits, os.urandom, progress_func) key = RSAKey(vals=(rsa.e, rsa.n)) key.d = rsa.d key.p = rsa.p key.q = rsa.q return key ### internals...
def generate_key_pair(): # Random number generator function. rng = Random.new().read # Generate a RSA key object by providing PyCrypto with # desired key size and a randfunc. # See docs: https://www.dlitz.net/software/pycrypto # /doc/#crypto-publickey-public-key-algorithms key = RSA.generate(4096, rng) # Create and save the private key in the project folder. # In practice, the private key would be stored locally with # the master bot and would not be accessible by anyone else. # However, this is irrelevant for our botnet simulation as everything is local. f = open('master_bot_private_key.pem', 'wb') f.write(key.exportKey('PEM').decode('ascii')) f.close() # Create and save the public key in the project folder. # In practice, the public key would be accessible from # the public server (in this case, the hypothetical pastebot.net). # However, this is irrelevant for our botnet simulation as everything is local. f = open('master_bot_public_key.pem', 'wb') f.write(key.publickey().exportKey('PEM').decode('ascii')) f.close()
def encrypt_file(filename, pubkey): # generate a key for symmetric encryption secretkey = gen_key() aes = AES.new(secretkey) contents = open(filename).read() # pad contents so that the length is a multiple of 16 padding_bytes = 16 - (len(contents) % 16) contents += chr(padding_bytes) * padding_bytes # encrypt the file open(filename + '.enc', 'w').write(aes.encrypt(contents)) # encrypt the secret decryption key rsa = PKCS1_OAEP.new(pubkey) open(filename + '.key', 'w').write(rsa.encrypt(secretkey)) # delete the original file # os.remove(filename)
def generate(bits, progress_func=None): """ Generate a new private RSA key. This factory function can be used to generate a new host key or authentication key. :param int bits: number of bits the generated key should be. :param function progress_func: an optional function to call at key points in key generation (used by ``pyCrypto.PublicKey``). :return: new `.RsaKey` private key """ rsa = RSA.generate(bits, os.urandom, progress_func) key = RsaKey(vals=(rsa.e, rsa.n)) key.d = rsa.d key.p = rsa.p key.q = rsa.q return key ### internals...
def testCoppersmithShortPadAttack(): from Crypto.PublicKey import RSA import random import math import binascii M = "flag{This_Msg_Is_2_1337}" M = int(binascii.hexlify(M),_sage_const_16 ) e = _sage_const_3 nBitSize = _sage_const_4096 key = RSA.generate(_sage_const_4096 ) #Give a bit of room, otherwhise the epsilon has to be tiny, and small roots will take forever m = int(math.floor(nBitSize/(e*e))) - _sage_const_220 assert (m > len(bin(M)[_sage_const_2 :])) r1 = random.randint(_sage_const_1 ,pow(_sage_const_2 ,m)) r2 = random.randint(r1,pow(_sage_const_2 ,m)) M1 = pow(_sage_const_2 ,m)*M + r1 M2 = pow(_sage_const_2 ,m)*M + r2 C1 = Integer(pow(M1,e,key.n)) C2 = Integer(pow(M2,e,key.n)) CoppersmithShortPadAttack(e,key.n,C1,C2)
def keypair_create(self): while True: name = self.faker.word() if not self.keeper.get("nova", "keypairs", "name", lambda x: x == name): break key = RSA.generate(2048).publickey().exportKey('OpenSSH') try: log.info("Creating keypair with name {}".format(name)) created = self.native.keypairs.create(name=name, public_key=key) except Exception as exc: log.critical("Exception: {}".format(exc)) traceback.print_exc() return return created
def sign(): from Crypto.Hash import SHA256 from Crypto.PublicKey import RSA key = RSA.generate(2048) public = key.publickey().exportKey('PEM').decode('ascii') private = key.exportKey('PEM').decode('ascii') text = 'abcdefgh'.encode('utf-8') hash = SHA256.new(text).digest() signature = key.sign(hash, '') print('signature=', signature) # Verify # Knowing the public key, it is easy to verify a message. # The plain text is sent to the user along with the signature. # The receiving side calculates the hash value and then uses the public key verify() method to validate its origin.
def signature(): from son.access.access import AccessClient from son.workspace.workspace import Workspace from Crypto.PublicKey import RSA from Crypto.Hash import SHA256 ws = Workspace('~/workspace/ws1') ac = AccessClient(ws) key = RSA.generate(2048) # package_path = 'son/access/samples/sonata-demo.son' package_path = '../samples/sonata-demo.son' with open(package_path, 'rb') as fhandle: package_content = fhandle.read() package_hash = SHA256.new(package_content).digest() signature = ac.sign_package(package_path, private_key=key.exportKey().decode('utf-8')) public_key = key.publickey() print(public_key.verify(package_hash, (int(signature),)))
def test_initialize_diff_users_same_info(self): """ Check that two different users can recover the same key for the same attribute, version, metadata, key length combination. """ RSA_key1 = RSA.generate(3072) RSA_pk1 = RSA_key1.publickey() RSA_key2 = RSA.generate(3072) RSA_pk2 = RSA_key2.publickey() keygen = KeyGen('Sixteen byte key') keystore = DummyKeyStore() keygen.initialize_users( {'user1': (RSA_pk1, [('attr', 1, 'meta', 16)]), 'user2': (RSA_pk2, [('attr', 1, 'meta', 16)])}, keystore) keywrap1 = keystore.retrieve('user1', 'attr', 1, 'meta') keywrap2 = keystore.retrieve('user2', 'attr', 1, 'meta') self.assertEqual(utils.unwrap_key(keywrap1, RSA_key1), utils.unwrap_key(keywrap2, RSA_key2))
def test_initialize_empty(self): """ Check that a key generated from empty string inputs, wrapped with a user's public key, and stored in a keystore can be retrieved and unwrapped with the user's secret key. """ RSA_key = RSA.generate(3072) RSA_pk = RSA_key.publickey() keygen = KeyGen('Sixteen byte key') keystore = DummyKeyStore() params = [('', 'attr', 1, 'meta'), ('userid', '', 1, 'meta'), ('userid', 'attr', 0, 'meta'), ('userid', 'attr', 1, ''), ('', '', 0, '')] for userid, attr, vers, meta in params: orig_key = keygen._generate_key(attr, vers, meta, 16) keygen.initialize_users( {userid: (RSA_pk, [(attr, vers, meta, 16)])}, keystore) keywrap = keystore.retrieve(userid, attr, vers, meta) self.assertEqual(utils.unwrap_key(keywrap, RSA_key), orig_key)
def test_invalid_enc_dec(self): """ Check that we cannot unwrap a key using a private key that does not correspond to the public key used to wrap the key. """ RSA_key = RSA.generate(3072) RSA_pk = RSA_key.publickey() for i in range(self.num_iters): sk = format(random.getrandbits(128), 'b') keywrap = wrap_key(sk, RSA_pk) other_key = RSA.generate(3072) try: decrypted_key = unwrap_key(keywrap, other_key) except ValueError: self.assertTrue(True, 'error') else: self.assertNotEqual(decrypted_key, sk, 'Decryption succeeded with invalid key')
def get_rsa_keys(): if not (os.path.exists(MANHOLE_SERVER_RSA_PUBLIC) and \ os.path.exists(MANHOLE_SERVER_RSA_PRIVATE)): # generate a RSA keypair log.info('generate-rsa-keypair') from Crypto.PublicKey import RSA rsa_key = RSA.generate(1024) public_key_str = rsa_key.publickey().exportKey(format='OpenSSH') private_key_str = rsa_key.exportKey() # save keys for next time file(MANHOLE_SERVER_RSA_PUBLIC, 'w+b').write(public_key_str) file(MANHOLE_SERVER_RSA_PRIVATE, 'w+b').write(private_key_str) log.debug('saved-rsa-keypair', public=MANHOLE_SERVER_RSA_PUBLIC, private=MANHOLE_SERVER_RSA_PRIVATE) else: public_key_str = file(MANHOLE_SERVER_RSA_PUBLIC).read() private_key_str = file(MANHOLE_SERVER_RSA_PRIVATE).read() return public_key_str, private_key_str
def encrypt_with_ecc(public_ecc_key, message, nonce=None): """Takes elliptic curve isntance (public_ecc_key) and a byte string (message), and outputs a ciphertext """ assert isinstance(public_ecc_key, ECC.EccKey),\ "public_ecc_key should be ECC key. Got {}".format(type(public_ecc_key)) random_ecc_key = ECC.generate(curve=public_ecc_key.curve) new_point = public_ecc_key.pointQ * random_ecc_key.d h = SHA256.new(str(new_point.x)) h.update('XXX' + str(new_point.y)) # 'XXX' is a delimiter key = h.digest() if not nonce: nonce = os.urandom(16) aes_engine = AES.new(key=key, mode=AES.MODE_EAX, nonce=nonce) ctx, tag = aes_engine.encrypt_and_digest(message) # Return: <ephemeral_pub_key>, <nonce>, <ciphertext>, <tag> return (random_ecc_key.public_key().export_key(format='OpenSSH'), aes_engine.nonce, ctx, tag)
def test_03_key_sizes(self): """ Ensure that RSA keys which are too small or too large are rejected. """ rsa = RSA.generate(getattr(settings, 'SECRETS_MIN_PUBKEY_SIZE', 2048) - 256) small_key = rsa.publickey().exportKey('PEM') try: UserKey(public_key=small_key).clean() self.fail("UserKey.clean() did not fail with an undersized RSA key") except ValidationError: pass rsa = RSA.generate(4096 + 256) # Max size is 4096 (enforced by master_key_cipher field size) big_key = rsa.publickey().exportKey('PEM') try: UserKey(public_key=big_key).clean() self.fail("UserKey.clean() did not fail with an oversized RSA key") except ValidationError: pass
def list(self, request): # Determine what size key to generate key_size = request.GET.get('key_size', 2048) if key_size not in range(2048, 4097, 256): key_size = 2048 # Export RSA private and public keys in PEM format key = RSA.generate(key_size) private_key = key.exportKey('PEM') public_key = key.publickey().exportKey('PEM') return Response({ 'private_key': private_key, 'public_key': public_key, })
def get_form(self, req: HttpRequest, obj: Domain=None, **kwargs: Any) -> type: if req.GET.get("_prefill_key", "0") == "1": def formfield_callback(field: _ModelField, request: HttpRequest=None, **kwargs: Any) -> Type[_FormField]: f = self.formfield_for_dbfield(field, request=request, **kwargs) # type: _FormField # f can be None if the dbfield does not get a FormField (like hidden fields # or auto increment IDs). Only the dbfield has a .name attribute. if f and field.name == "dkimkey": if obj: obj.dkimkey = RSA.generate(2048).exportKey("PEM").decode("utf-8") else: f.initial = RSA.generate(2048).exportKey("PEM").decode("utf-8") return f kwargs["formfield_callback"] = functools.partial(formfield_callback, request=req) form_t = super().get_form(req, obj, **kwargs) return form_t
def generate_fingerprint(public_key): try: pub_bytes = public_key.encode('utf-8') # Test that the given public_key string is a proper ssh key. The # returned object is unused since pyca/cryptography does not have a # fingerprint method. serialization.load_ssh_public_key( pub_bytes, backends.default_backend()) pub_data = base64.b64decode(public_key.split(' ')[1]) digest = hashes.Hash(hashes.MD5(), backends.default_backend()) digest.update(pub_data) md5hash = digest.finalize() raw_fp = binascii.hexlify(md5hash) if six.PY3: raw_fp = raw_fp.decode('ascii') return ':'.join(a + b for a, b in zip(raw_fp[::2], raw_fp[1::2])) except Exception: raise exception.InvalidKeypair( reason=_('failed to generate fingerprint'))
def generate_key(bits): """Generate a paramiko RSAKey""" # NOTE(dims): pycryptodome has changed the signature of the RSA.generate # call. specifically progress_func has been dropped. paramiko still uses # pycrypto. However some projects like latest pysaml2 have switched from # pycrypto to pycryptodome as pycrypto seems to have been abandoned. # paramiko project has started transition to pycryptodome as well but # there is no release yet with that support. So at the moment depending on # which version of pysaml2 is installed, Nova is likely to break. So we # call "RSA.generate(bits)" which works on both pycrypto and pycryptodome # and then wrap it into a paramiko.RSAKey rsa = RSA.generate(bits) key = paramiko.RSAKey(vals=(rsa.e, rsa.n)) key.d = rsa.d key.p = rsa.p key.q = rsa.q return key
def setUp(self): self.set_env_var(CHUNK_SIZE_VAR, 2) self.private_key = RSA.generate(1024) private_key_path = self.create_file(self.private_key.exportKey()) self.set_env_var(PRIVATE_KEY_FN, private_key_path) self.version = '2.0' self.product = 'a' * 64 self.hardware = 'PowerX' self.pkg_uid = 'pkg-uid' content = b'spam' self.obj_fn = self.create_file(content, name='object') self.obj_sha256 = hashlib.sha256(content).hexdigest() self.obj_options = { 'filename': self.obj_fn, 'mode': 'raw', 'target-type': 'device', 'target': '/dev/sda', }
def test_can_sign_dict(self): _, fn = tempfile.mkstemp() self.addCleanup(os.remove, fn) key = RSA.generate(1024) with open(fn, 'wb') as fp: fp.write(key.exportKey()) dict_ = {} message = SHA256.new(json.dumps(dict_).encode()) result = utils.sign_dict(dict_, fn) signature = base64.b64decode(result) verifier = PKCS1_v1_5.new(key) is_valid = verifier.verify(message, signature) self.assertTrue(is_valid)
def generate_rsa_private_key(): """Generate a new RSA private key.""" rand = Random.new().read return RSA.generate(4096, rand)
def createKey(): random_generator = Random.new().read return RSA.generate(1024, random_generator)
def generate_OpenSSH(ctx, bits=2048): ''' Generate an RSA OpenSSH keypair with an exponent of 65537 param: bits The key length in bits Return private key and public key ''' private_key = RSA.generate(bits, e=65537) public_key = private_key.publickey() str_private_key = private_key.exportKey("PEM") str_public_key = public_key.exportKey("OpenSSH") return str_private_key, str_public_key
def setUp(self): self.rng = Random.new().read self.key1024 = RSA.generate(1024, self.rng) # List of tuples with test data for PKCS#1 OAEP # Each tuple is made up by: # Item #0: dictionary with RSA key component # Item #1: plaintext # Item #2: ciphertext # Item #3: random data (=seed) # Item #4: hash object
def setUp(self): self.rng = Random.new().read self.key1024 = RSA.generate(1024, self.rng) # List of tuples with test data for PKCS#1 v1.5. # Each tuple is made up by: # Item #0: dictionary with RSA key component, or key to import # Item #1: plaintext # Item #2: ciphertext # Item #3: random data
def testSignVerify(self): rng = Random.new().read key = RSA.generate(1024, rng) for hashmod in (MD2,MD5,SHA,SHA224,SHA256,SHA384,SHA512,RIPEMD): h = hashmod.new() h.update(b('blah blah blah')) signer = PKCS.new(key) s = signer.sign(h) result = signer.verify(h, s) self.failUnless(result)
def generateRSAkey(options): from Crypto.PublicKey import RSA print 'Generating public/private rsa key pair.' key = RSA.generate(int(options['bits']), common.entropy.get_bytes) _saveKey(key, options)
def gen_privkey(): private = RSA.generate(1024) return private.exportKey()
def generate(bits): key = PyCrypto_RSAKey() def f(numBytes): return bytesToString(getRandomBytes(numBytes)) key.rsa = RSA.generate(bits, f) return key
def gen_keys(self): key = RSA.generate(self._key_size, urandom) private_key = key.exportKey() public_key = key.publickey().exportKey() self._private_key, self._public_key = private_key, public_key
def getNodeKey(self): try: with open(".rsa/private.pem",'r') as write_key: key = RSA.importKey(write_key.read()) except: key = RSA.generate(2048) with open(".rsa/private.pem",'wb') as write_key: write_key.write(key.exportKey()) with open(".rsa/public.pem",'wb') as write_key: write_key.write(key.publickey().exportKey()) return key
def random_uuid(self): ''' Use the Python uuid4 to generate a 'truly random' uuid - useful in creating Consul ACL tokens ''' key = random_uuid() return str(key)
def random_rsa(self, keylength=DEFAULT_KEYLENGTH): ''' Return a random RSA (public, private) keypair with a key length of DEFAULT_KEYLENGTH. ''' key = RSA.generate(keylength, os.urandom) pubkey = key.publickey() return (key.exportKey('PEM'), pubkey.exportKey('OpenSSH'))
def __create_key(self, name, path, bits=2048): key = RSA.generate(bits) private_key = "{}{}".format(path, name) public_key = "{}{}.pub".format(path, name) with open(private_key, 'w') as content_file: os.chmod(private_key, 0600) content_file.write(key.exportKey('PEM')) pubkey = key.publickey() with open(public_key, 'w') as content_file: content_file.write(pubkey.exportKey('OpenSSH'))