我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用Crypto.PublicKey.RSA.importKey()。
def from_string(key_pem, is_x509_cert): """Construct a Verified instance from a string. Args: key_pem: string, public key in PEM format. is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is expected to be an RSA key in PEM format. Returns: Verifier instance. """ if is_x509_cert: key_pem = _to_bytes(key_pem) pemLines = key_pem.replace(b' ', b'').split() certDer = _urlsafe_b64decode(b''.join(pemLines[1:-1])) certSeq = DerSequence() certSeq.decode(certDer) tbsSeq = DerSequence() tbsSeq.decode(certSeq[0]) pubkey = RSA.importKey(tbsSeq[6]) else: pubkey = RSA.importKey(key_pem) return PyCryptoVerifier(pubkey)
def from_string(key, password='notasecret'): """Construct a Signer instance from a string. Args: key: string, private key in PEM format. password: string, password for private key file. Unused for PEM files. Returns: Signer instance. Raises: NotImplementedError if the key isn't in PEM format. """ parsed_pem_key = _parse_pem_key(_to_bytes(key)) if parsed_pem_key: pkey = RSA.importKey(parsed_pem_key) else: raise NotImplementedError( 'PKCS12 format is not supported by the PyCrypto library. ' 'Try converting to a "PEM" ' '(openssl pkcs12 -in xxxxx.p12 -nodes -nocerts > ' 'privatekey.pem) ' 'or using PyOpenSSL if native code is an option.') return PyCryptoSigner(pkey)
def from_string(key_pem, is_x509_cert): """Construct a Verified instance from a string. Args: key_pem: string, public key in PEM format. is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is expected to be an RSA key in PEM format. Returns: Verifier instance. Raises: NotImplementedError if is_x509_cert is true. """ if is_x509_cert: # raise NotImplementedError( # 'X509 certs are not supported by the PyCrypto library. ' # 'Try using PyOpenSSL if native code is an option.') key_pem = x509.get_pubkey(key_pem) pubkey = RSA.importKey(key_pem) return PyCryptoVerifier(pubkey)
def from_string(key, password='notasecret'): """Construct a Signer instance from a string. Args: key: string, private key in PEM format. password: string, password for private key file. Unused for PEM files. Returns: Signer instance. Raises: NotImplementedError if they key isn't in PEM format. """ if key.startswith('-----BEGIN '): pkey = RSA.importKey(key) else: raise NotImplementedError( 'PKCS12 format is not supported by the PyCrpto library. ' 'Try converting to a "PEM" ' '(openssl pkcs12 -in xxxxx.p12 -nodes -nocerts > privatekey.pem) ' 'or using PyOpenSSL if native code is an option.') return PyCryptoSigner(pkey)
def verifySignature(self): key = RSA.importKey(open(self.raven_public_key).read()) # Compile the parts to hash together parts = self.rav_str.split("!") parts.pop() # Remove the last two items related to signing parts.pop() to_hash = "!".join(parts) # Now hash it and verify our_hash = SHA.new(to_hash) #print our_hash verifier = PKCS1_v1_5.new(key) # Obtain the correct form of the signature signature = urllib.unquote(self.raven_signature) signature = signature.replace("-","+") signature = signature.replace(".","/") signature = signature.replace("_","=") signature = base64.b64decode(signature) if verifier.verify(our_hash, signature): return True else: return False
def testEncrypt1(self): for test in self._testData: # Build the key key = RSA.importKey(test[0]) # RNG that takes its random numbers from a pool given # at initialization class randGen: def __init__(self, data): self.data = data self.idx = 0 def __call__(self, N): r = self.data[self.idx:N] self.idx += N return r # The real test key._randfunc = randGen(t2b(test[3])) cipher = PKCS.new(key) ct = cipher.encrypt(b(test[1])) self.assertEqual(ct, t2b(test[2]))
def testSign1(self): for i in range(len(self._testData)): row = self._testData[i] # Build the key if isStr(row[0]): key = RSA.importKey(row[0]) else: comps = [ long(rws(row[0][x]),16) for x in ('n','e','d') ] key = RSA.construct(comps) h = row[3].new() # Data to sign can either be in hex form or not try: h.update(t2b(row[1])) except: h.update(b(row[1])) # The real test signer = PKCS.new(key) self.failUnless(signer.can_sign()) s = signer.sign(h) self.assertEqual(s, t2b(row[2]))
def testVerify1(self): for i in range(len(self._testData)): row = self._testData[i] # Build the key if isStr(row[0]): key = RSA.importKey(row[0]).publickey() else: comps = [ long(rws(row[0][x]),16) for x in ('n','e') ] key = RSA.construct(comps) h = row[3].new() # Data to sign can either be in hex form or not try: h.update(t2b(row[1])) except: h.update(b(row[1])) # The real test verifier = PKCS.new(key) self.failIf(verifier.can_sign()) result = verifier.verify(h, t2b(row[2])) self.failUnless(result)
def from_string(key_pem, is_x509_cert): """Construct a Verified instance from a string. Args: key_pem: string, public key in PEM format. is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is expected to be an RSA key in PEM format. Returns: Verifier instance. """ if is_x509_cert: key_pem = _helpers._to_bytes(key_pem) pemLines = key_pem.replace(b' ', b'').split() certDer = _helpers._urlsafe_b64decode(b''.join(pemLines[1:-1])) certSeq = DerSequence() certSeq.decode(certDer) tbsSeq = DerSequence() tbsSeq.decode(certSeq[0]) pubkey = RSA.importKey(tbsSeq[6]) else: pubkey = RSA.importKey(key_pem) return PyCryptoVerifier(pubkey)
def from_string(key, password='notasecret'): """Construct a Signer instance from a string. Args: key: string, private key in PEM format. password: string, password for private key file. Unused for PEM files. Returns: Signer instance. Raises: NotImplementedError if the key isn't in PEM format. """ parsed_pem_key = _helpers._parse_pem_key(_helpers._to_bytes(key)) if parsed_pem_key: pkey = RSA.importKey(parsed_pem_key) else: raise NotImplementedError( 'No key in PEM format was detected. This implementation ' 'can only use the PyCrypto library for keys in PEM ' 'format.') return PyCryptoSigner(pkey)
def import_ssh_key(private_key_file, passphrase=None): """ Import contents from RSA private key file param private_key_file: path to private key file param passphrase: passphrase for the private key, by default None return: contents from private key file """ key_file_contents = None private_key_file = os.path.abspath(private_key_file) try: with open(private_key_file, 'r') as key_file: key_file_contents = key_file.readlines() return RSA.importKey(key_file_contents, passphrase=passphrase) except IOError as ie: raise exception.PrivateKeyNotFound(private_key_file) except Exception as e: raise exception.ImportKeyError(private_key_file)
def _build_auth_token_data( self, auth_token_ticket, authenticator, private_key, **kwargs ): auth_token = dict( authenticator=authenticator, ticket=auth_token_ticket, **kwargs ) auth_token = json.dumps(auth_token, sort_keys=True) if six.PY3: auth_token = auth_token.encode('utf-8') digest = SHA256.new() digest.update(auth_token) auth_token = base64.b64encode(auth_token) rsa_key = RSA.importKey(private_key) signer = PKCS1_v1_5.new(rsa_key) auth_token_signature = signer.sign(digest) auth_token_signature = base64.b64encode(auth_token_signature) return auth_token, auth_token_signature
def from_string(key_pem, is_x509_cert): """Construct a Verified instance from a string. Args: key_pem: string, public key in PEM format. is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is expected to be an RSA key in PEM format. Returns: Verifier instance. """ if is_x509_cert: if isinstance(key_pem, six.text_type): key_pem = key_pem.encode('ascii') pemLines = key_pem.replace(b' ', b'').split() certDer = _urlsafe_b64decode(b''.join(pemLines[1:-1])) certSeq = DerSequence() certSeq.decode(certDer) tbsSeq = DerSequence() tbsSeq.decode(certSeq[0]) pubkey = RSA.importKey(tbsSeq[6]) else: pubkey = RSA.importKey(key_pem) return PyCryptoVerifier(pubkey)
def from_string(key, password='notasecret'): """Construct a Signer instance from a string. Args: key: string, private key in PEM format. password: string, password for private key file. Unused for PEM files. Returns: Signer instance. Raises: NotImplementedError if they key isn't in PEM format. """ parsed_pem_key = _parse_pem_key(key) if parsed_pem_key: pkey = RSA.importKey(parsed_pem_key) else: raise NotImplementedError( 'PKCS12 format is not supported by the PyCrypto library. ' 'Try converting to a "PEM" ' '(openssl pkcs12 -in xxxxx.p12 -nodes -nocerts > privatekey.pem) ' 'or using PyOpenSSL if native code is an option.') return PyCryptoSigner(pkey)
def from_string(key_pem, is_x509_cert): """Construct a Verified instance from a string. Args: key_pem: string, public key in PEM format. is_x509_cert: bool, True if key_pem is an X509 cert, otherwise it is expected to be an RSA key in PEM format. Returns: Verifier instance. """ if is_x509_cert: pemLines = key_pem.replace(' ', '').split() certDer = _urlsafe_b64decode(''.join(pemLines[1:-1])) certSeq = DerSequence() certSeq.decode(certDer) tbsSeq = DerSequence() tbsSeq.decode(certSeq[0]) pubkey = RSA.importKey(tbsSeq[6]) else: pubkey = RSA.importKey(key_pem) return PyCryptoVerifier(pubkey)
def __init__(self, grid_name, credentials, **kwargs): self.grid_name = grid_name self.credentials = urllib.unquote(credentials) self.current_grid = GridEntity.select().where( GridEntity.name == grid_name).get() self.current_config = configs[ self.current_grid.provider].select().where( configs[self.current_grid.provider].parentgrid == self.current_grid).get() self.private_key_text = urllib.unquote(self.current_config.sshkeydata) self.private_key = RSA.importKey(self.private_key_text.strip()) self.public_key_text = self.private_key.publickey().exportKey('OpenSSH') self.current_groups = [] if not os.path.exists('result/{}/infrastructure'.format(grid_name)): os.makedirs('result/{}/infrastructure'.format(grid_name)) for group in groups[self.current_grid.provider].select(): if group.parentgrid.name == grid_name: self.current_groups.append(group) self.config = AutoDict() self.networking = AutoDict() self.security = AutoDict() self.terminal = AutoDict() self.masters = AutoDict()
def __init__(self, grid_name, api_user, api_pass, api_url, **kwargs): self.grid_name = grid_name self.api_user = urllib.unquote(api_user) self.api_pass = urllib.unquote(api_pass) self.api_url = urllib.unquote(api_url) self.current_grid = GridEntity.select().where( GridEntity.name == grid_name).get() self.current_config = configs[ self.current_grid.provider].select().where( configs[self.current_grid.provider].parentgrid == self.current_grid).get() self.private_key_text = urllib.unquote(self.current_config.sshkeydata) self.private_key = RSA.importKey(self.private_key_text.strip()) self.public_key_text = self.private_key.publickey().exportKey('OpenSSH') self.current_groups = [] if not os.path.exists('result/{}/infrastructure'.format(grid_name)): os.makedirs('result/{}/infrastructure'.format(grid_name)) for group in groups[self.current_grid.provider].select(): if group.parentgrid.name == grid_name: self.current_groups.append(group) self.config = AutoDict() self.networking = AutoDict() self.security = AutoDict() self.terminal = AutoDict() self.masters = AutoDict()
def check_rsa_key(sample): """ Returns a 3-tuple (is_rsa_key, has_private_component, n_bit_length) is_rsa_key - a bool indicating that the sample is, in fact, an RSA key in a format readable by Crypto.PublicKey.RSA.importKey has_private_component - a bool indicating whether or not d was in the analyzed key, or false if the sample is not an RSA key n_bit_length - an int representing the bit length of the modulus found in the analyzed key, or False if the sample is not an RSA key """ is_rsa_key = has_private_component = n_bit_length = False try: rsakey = RSA.importKey(sample.strip()) is_rsa_key = True if rsakey.has_private(): has_private_component = True n_bit_length = gmpy.mpz(rsakey.n).bit_length() # Don't really care why it fails, just want to see if it did except: is_rsa_key = False return (is_rsa_key, has_private_component, n_bit_length)
def testSign1(self): for i in range(len(self._testData)): row = self._testData[i] # Build the key if isStr(row[0]): key = RSA.importKey(row[0]) else: comps = [ int(rws(row[0][x]),16) for x in ('n','e','d') ] key = RSA.construct(comps) h = row[3].new() # Data to sign can either be in hex form or not try: h.update(t2b(row[1])) except: h.update(b(row[1])) # The real test signer = PKCS.new(key) self.failUnless(signer.can_sign()) s = signer.sign(h) self.assertEqual(s, t2b(row[2]))
def testVerify1(self): for i in range(len(self._testData)): row = self._testData[i] # Build the key if isStr(row[0]): key = RSA.importKey(row[0]).publickey() else: comps = [ int(rws(row[0][x]),16) for x in ('n','e') ] key = RSA.construct(comps) h = row[3].new() # Data to sign can either be in hex form or not try: h.update(t2b(row[1])) except: h.update(b(row[1])) # The real test verifier = PKCS.new(key) self.failIf(verifier.can_sign()) result = verifier.verify(h, t2b(row[2])) self.failUnless(result)
def private_key(self): """Required by federation. Corresponds to private key. """ if self.rsa_private_key: return RSA.importKey(self.rsa_private_key)
def key(self): """Required by federation. Corresponds to public key. """ if self.rsa_public_key: return RSA.importKey(self.rsa_public_key)
def test_user_public_key(user_contract, web3): key = encrypt.createKey() binPubKey1 = key.publickey().exportKey('DER') print(sys.getsizeof(binPubKey1)) user_contract.transact().setPubKey(binPubKey1) binPubKey2 = user_contract.call().userPubkey(web3.eth.accounts[0]) print(binPubKey2) pubKeyObj = RSA.importKey(binPubKey2) enc_data = encrypt.encrypt(pubKeyObj, 'Hello there!') print(encrypt.decrypt(enc_data, key))
def test_ipfs_integration(user_contract, web3): # create new private key key = encrypt.createKey() # get public key from private key binPubKey1 = key.publickey().exportKey('DER') # set public key for user in blockchain user_contract.transact().setPubKey(binPubKey1) # retrieve public key for user binPubKey2 = user_contract.call().userPubkey(web3.eth.accounts[0]) # import public key from blockchain pubKeyObj = RSA.importKey(binPubKey2) # encrypt message with public key of user enc_data = encrypt.encrypt(pubKeyObj, 'Last Night...') # connect to locally running IPFS daemon api = ipfsapi.connect('127.0.0.1', 5001) # add encrypted message res = api.add_bytes(enc_data) print("\n\nIPFS_HASH_CREATE:\t", res) # filter for YouHaveMail events on contract transfer_filter = user_contract.on("YouHaveMail") user_contract.transact().sendMessage(web3.eth.accounts[1], res) wait(transfer_filter) log_entries = transfer_filter.get() # extract ipfsHash from event res = log_entries[0]['args']['_ipfsHash'] print("IPFS_HASH_EVENT:\t", res) # get stored encrypted message from IPFS ret = api.cat(res) print("IPFS_HASH_DATA:\t\t", ret) # decrypt message with private key of user print("IFPS_DATA_DECR:\t\t", encrypt.decrypt(ret, key))