我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用cryptography.hazmat.primitives.serialization.load_pem_private_key()。
def test_key_not_exists(self, pem_path): """ When we get the client key and no key file exists, a new key should be generated and the key should be saved in a key file. """ key = maybe_key(pem_path) pem_file = pem_path.child(u'client.key') assert_that(pem_file.exists(), Equals(True)) file_key = serialization.load_pem_private_key( pem_file.getContent(), password=None, backend=default_backend() ) file_key = JWKRSA(key=file_key) assert_that(key, Equals(file_key))
def _restore_state(self): """ Restore user state. """ try: state = self._state_store.get_value(self._state_store_key) state_dict = pickle.loads( binascii.unhexlify(state.encode("utf-8"))) self._name = state_dict['name'] self.enrollment_secret = state_dict['enrollment_secret'] enrollment = state_dict['enrollment'] if enrollment: private_key = serialization.load_pem_private_key( enrollment['private_key'], password=None, backend=default_backend() ) cert = enrollment['cert'] self.enrollment = Enrollment(private_key, cert) self.affiliation = state_dict['affiliation'] self.account = state_dict['account'] self.roles = state_dict['roles'] self._org = state_dict['org'] self.msp_id = state_dict['msp_id'] except Exception as e: raise IOError("Cannot deserialize the user", e)
def read_file(self, subject_name, pw=None): """ ???? ??? :param subject_name: ??? ??? ??? ?? :return: ??? ??? """ pem_path = os.path.join(os.path.dirname(__file__), "../../resources/unittest/" + subject_name + "/cert.pem") f = open(pem_path, "rb") cert_pem = f.read() f.close() cert = x509.load_pem_x509_certificate(cert_pem, default_backend()) key_path = os.path.join(os.path.dirname(__file__), "../../resources/unittest/" + subject_name + "/key.pem") f = open(key_path, "rb") cert_key = f.read() f.close() private_key = serialization.load_pem_private_key(cert_key, pw, default_backend()) return {'cert': cert, 'private_key': private_key}
def load_pki(self, cert_path: str, cert_pass=None): """ ??? ?? :param cert_path: ??? ?? :param cert_pass: ??? ???? """ ca_cert_file = join(cert_path, self.CERT_NAME) ca_pri_file = join(cert_path, self.PRI_NAME) # ???/??? ?? with open(ca_cert_file, "rb") as der: cert_bytes = der.read() self.__ca_cert = x509.load_pem_x509_certificate(cert_bytes, default_backend()) with open(ca_pri_file, "rb") as der: private_bytes = der.read() try: self.__ca_pri = serialization.load_pem_private_key(private_bytes, cert_pass, default_backend()) except ValueError: logging.debug("Invalid Password") # ??? ? ? ?? sign = self.sign_data(b'TEST') if self.verify_data(b'TEST', sign) is False: logging.debug("Invalid Signature(Root Certificate load test)")
def create_decryptor(private_location, public_location): try: with open(private_location, "rb") as key_file: private_key = serialization.load_pem_private_key( key_file.read(), password=None, backend=default_backend() ) except FileNotFoundError: with open(private_location, "wb") as key_file: private_key = rsa.generate_private_key( public_exponent=65537, key_size=2048, backend=default_backend() ) key_file.write(private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption() )) with open(public_location, "wb") as public_file: public_key = private_key.public_key() pem = public_key.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo ) public_file.write(pem) def decrypt(ciphertext): return private_key.decrypt( ciphertext, padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA1()), algorithm=hashes.SHA1(), label=None ) ) return decrypt
def _load_cryptography_key(cls, data, password=None, backend=None): backend = default_backend() if backend is None else backend exceptions = {} # private key? for loader in (serialization.load_pem_private_key, serialization.load_der_private_key): try: return loader(data, password, backend) except (ValueError, TypeError, cryptography.exceptions.UnsupportedAlgorithm) as error: exceptions[loader] = error # public key? for loader in (serialization.load_pem_public_key, serialization.load_der_public_key): try: return loader(data, backend) except (ValueError, cryptography.exceptions.UnsupportedAlgorithm) as error: exceptions[loader] = error # no luck raise errors.Error('Unable to deserialize key: {0}'.format(exceptions))
def get_channel(self): #first get a token we need to sign in order to prove we are who we say we are r = requests.get(str(self.base_link_uri) + "/get_device_token", params={"UUID": self.uuid, }) token = r.json()["token"] # get the private Key with open(CloudLinkSettings.PRIVATE_KEY_LOCATION,'r') as key_file: private_key = serialization.load_pem_private_key(key_file.read(), password=CloudLinkSettings.PRIVATE_KEY_PASSPHRASE, backend=default_backend()) # sign the token with our private key signer = private_key.signer(padding.PKCS1v15(), hashes.SHA256()) signer.update(bytes(token)) signature = signer.finalize() # get the randomly assigned channel for my UUID r = requests.get(str(self.base_link_uri) + "/get_device_group", params={"UUID": self.uuid, "signature": b64encode(signature), "format": "PKCS1_v1_5"}) if r.ok: self.channel_uri = r.json()["channel"] elif r.status_code == 400: raise Exception("UUID or Token not registered with Cloud.") elif r.status_code == 403: raise Exception("Signature didn't verify correctly. Bad private key or signature.")
def decode_pem_key(key_pem): """Convert plaintext PEM key into the format usable for JWT generation Args: key_pam (str): key data in PEM format, presented as plain string Returns: Parsed PEM data """ private_key = serialization.load_pem_private_key( data=key_pem.encode('ascii'), password=None, backend=default_backend()) msg = 'Unexpected private key type' assert isinstance(private_key, rsa.RSAPrivateKey), msg assert private_key.key_size >= 2048, 'RSA key size too small' return private_key
def load_private_key(privkey_file): ''' Loads the private key stored in the file indicated by the privkey_file parameter and returns it The `privkey_file` parameter indicates the absolute path to the file storing the private key. The key returned is a RSAPrivateKey instance. ''' with open(privkey_file, "rb") as key_file: privkey = serialization.load_pem_private_key( key_file.read(), password=None, backend=default_backend() ) return privkey
def rsa_sign(private_key, message): """Signs a message with the provided Rsa private key. Args: private_key (str): Rsa private key with BEGIN and END sections. message (str): Message to be hashed and signed. Returns: str: Hex signature of the message, with its leading 0x stripped. """ private_rsa = load_pem_private_key(bytes(private_key), password=None, backend=default_backend()) hashed = sha256(message) signature = private_rsa.sign( hashed, padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH ), hashes.SHA256() ) return binascii.hexlify(bytearray(signature))
def load_or_create_client_key(pem_path): """ Load the client key from a directory, creating it if it does not exist. .. note:: The client key that will be created will be a 2048-bit RSA key. :type pem_path: ``twisted.python.filepath.FilePath`` :param pem_path: The certificate directory to use, as with the endpoint. """ acme_key_file = pem_path.asTextMode().child(u'client.key') if acme_key_file.exists(): key = serialization.load_pem_private_key( acme_key_file.getContent(), password=None, backend=default_backend()) else: key = generate_private_key(u'rsa') acme_key_file.setContent( key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption())) return JWKRSA(key=key)
def __init__(self, signer): self._factory = MessageFactory( family_name="sawtooth_validator_registry", family_version="1.0", namespace="6a4372", signer=signer ) self.public_key_hash = hashlib.sha256( signer.get_public_key().as_hex().encode()).hexdigest() self._report_private_key = \ serialization.load_pem_private_key( self.__REPORT_PRIVATE_KEY_PEM__.encode(), password=None, backend=backends.default_backend()) # First we need to create a public/private key pair for the PoET # enclave to use. context = create_context('secp256k1') self._poet_private_key = Secp256k1PrivateKey.from_hex( "1f70fa2518077ad18483f48e77882d11983b537fa5f7cf158684d2c670fe4f1f") self.poet_public_key = context.get_public_key(self._poet_private_key)
def _mgf1_sha256_supported(): wk = serialization.load_pem_private_key( data=VALUES['raw'][b'asym1'][EncryptionKeyType.PRIVATE], password=None, backend=default_backend() ) try: wk.public_key().encrypt( plaintext=b'aosdjfoiajfoiaj;foijae;rogijaerg', padding=padding.OAEP( mgf=padding.MGF1(hashes.SHA256()), algorithm=hashes.SHA256(), label=None ) ) except cryptography.exceptions.UnsupportedAlgorithm: return False return True
def __init__(self, wrapping_algorithm, wrapping_key, wrapping_key_type, password=None): """Prepares initial values.""" self.wrapping_algorithm = wrapping_algorithm self.wrapping_key_type = wrapping_key_type if wrapping_key_type is EncryptionKeyType.PRIVATE: self._wrapping_key = serialization.load_pem_private_key( data=wrapping_key, password=password, backend=default_backend() ) elif wrapping_key_type is EncryptionKeyType.PUBLIC: self._wrapping_key = serialization.load_pem_public_key( data=wrapping_key, backend=default_backend() ) elif wrapping_key_type is EncryptionKeyType.SYMMETRIC: self._wrapping_key = wrapping_key self._derived_wrapping_key = derive_data_encryption_key( source_key=self._wrapping_key, algorithm=self.wrapping_algorithm.algorithm, message_id=None ) else: raise InvalidDataKeyError('Invalid wrapping_key_type: {}'.format(wrapping_key_type))
def __call__(self, parser, namespace, values, option_string): if namespace.key_type == 'raw': setattr(namespace, self.dest, raw_loader(values)) elif namespace.key_type == 'pem': setattr(namespace, self.dest, serialization.load_pem_private_key(raw_loader(values), None, backend)) elif namespace.key_type == 'der': setattr(namespace, self.dest, serialization.load_der_private_key(raw_loader(values), None, backend))
def __init__(self, pem_private_key, private_key_password=None): """ RSA Certificate Authority used to sign certificates. :param pem_private_key: PEM formatted RSA Private Key. It should be encrypted with a password, but that is not required. :param private_key_password: Password to decrypt the PEM RSA Private Key, if it is encrypted. Which it should be. """ super(SSHCertificateAuthority, self).__init__() self.public_key_type = SSHPublicKeyType.RSA self.private_key = load_pem_private_key(pem_private_key, private_key_password, default_backend()) self.signer = self.private_key.signer(padding.PKCS1v15(), hashes.SHA1()) ca_pub_numbers = self.private_key.public_key().public_numbers() self.e = ca_pub_numbers.e self.n = ca_pub_numbers.n
def maybe_key(pem_path): """ Set up a client key if one does not exist already. https://gist.github.com/glyph/27867a478bb71d8b6046fbfb176e1a33#file-local-certs-py-L32-L50 :type pem_path: twisted.python.filepath.FilePath :param pem_path: The path to the certificate directory to use. """ acme_key_file = pem_path.child(u'client.key') if acme_key_file.exists(): key = serialization.load_pem_private_key( acme_key_file.getContent(), password=None, backend=default_backend() ) else: key = generate_private_key(u'rsa') acme_key_file.setContent( key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption() ) ) return jose.JWKRSA(key=key)
def get_peer_org_user(client, peer_org, user='Admin'): """Loads the requested user for a given peer org and returns a user object. """ peer_user_base_path = os.path.join( os.getcwd(), 'test/fixtures/e2e_cli/crypto-config/peerOrganizations/{0}' '/users/{1}@{0}/msp/'.format(peer_org, user) ) key_path = os.path.join( peer_user_base_path, 'keystore/', E2E_CONFIG['test-network'][peer_org]['users'][user]['private_key'] ) cert_path = os.path.join( peer_user_base_path, 'signcerts/', E2E_CONFIG['test-network'][peer_org]['users'][user]['cert'] ) with open(key_path, 'rb') as key: key_pem = key.read() with open(cert_path, 'rb') as cert: cert_pem = cert.read() org_user = User('peer' + peer_org + user, peer_org, client.state_store) # wrap the key in a 'cryptography' private key object # so that all the methods can be used private_key = load_pem_private_key(key_pem, None, default_backend()) enrollment = Enrollment(private_key, cert_pem) org_user.enrollment = enrollment org_user.msp_id = E2E_CONFIG['test-network'][peer_org]['mspid'] return org_user
def append_signature(target_file, priv_key, priv_key_password=None): """Append signature to the end of file""" with open(target_file, "rb") as f: contents = f.read() with open(priv_key, "rb") as kf: private_key = serialization.load_pem_private_key( kf.read(), password=priv_key_password, backend=default_backend()) signature = private_key.sign(contents, padding.PKCS1v15(), hashes.SHA512()) with open(target_file, "ab") as f: f.write(signature)
def load_private_key_string(key_string): return serialization.load_pem_private_key(key_string, password=None, backend=default_backend())
def get_private_key_and_key_id(issuer, key_id=None): file_name = get_key_file_name(keys=api_settings.PRIVATE_KEYS, issuer=issuer, key_id=key_id) file_data = read_key_file(file_name=file_name) key = load_pem_private_key(file_data, password=None, backend=default_backend()) return key, get_key_id(file_name=file_name)
def load_key(filename): with open(filename, 'rb') as pem_in: pemlines = pem_in.read() private_key = load_pem_private_key(pemlines, None, default_backend()) return private_key
def convert_private_key_to_pem(private_key): private_key_bytes = private_key if not isinstance(private_key_bytes, bytes): private_key_bytes = private_key.encode() return serialization.load_pem_private_key( private_key_bytes, password=None, backend=default_backend() )
def convert_privkey_to_pem(key): key_bytes = ApiClient.convert_to_bytes(key) return serialization.load_pem_private_key( key_bytes, password=None, backend=default_backend() )
def load_rsa_private_key(*names): """Load RSA private key.""" loader = _guess_loader(names[-1], serialization.load_pem_private_key, serialization.load_der_private_key) return jose.ComparableRSAKey(loader( load_vector(*names), password=None, backend=default_backend()))
def from_pem(cls, key_data, passphrase=None): private_key = serialization.load_pem_private_key( key_data, passphrase, default_backend() ) return cls(private_key)
def get_key(self): password = settings.ACCOUNT_KEY_PASSWORD.encode() if not self.key: self.set_key() self.save() return serialization.load_pem_private_key(self.key.encode(), password=password, backend=default_backend())
def _set_decryptor( self, plaintext_private_key=None, encrypted_private_key=None): '''Creates a OAEP decryptor based on a RSA private key.''' if self.decryptor is not None: return if not plaintext_private_key: if not encrypted_private_key: try: with open(self.encrypted_private_key_file, 'rb') as f: encrypted_private_key = f.read() except IOError as e: raise errors.InputError( "private key '%s' does not exist." % ( self.encrypted_private_key_file)) plaintext_private_key = self._get_plaintext_private_key( encrypted_private_key, encryption_context=self.encryption_context) try: private_key_obj = serialization.load_pem_private_key( plaintext_private_key, password=None, backend=default_backend()) self.decryptor = Decryptor(private_key_obj) except Exception as e: raise errors.DecryptorError( "private key is malformed. (Reason: %s)" % (str(e)))
def _load_private(self, data): try: self._private = serialization.load_pem_private_key( b64decode(data), password=None, backend=self._backend) self._private_jwk = JWK() self._private_jwk.import_from_pyca(self._private) except Exception: # pragma: no cover LOGGER.error( 'Failed to parse private key starting with: %s', data[:28])
def __init__(self, key: bytes, password: str=None): self._pub_key = None if isinstance(key, bytes): private_key = serialization.load_pem_private_key( key, password=password.encode('utf-8') if password else None, backend=default_backend() ) self._hazmat_private_key = private_key else: self._hazmat_private_key = key if self._hazmat_private_key.key_size < 1023: raise RuntimeError('Minimal key size is 1024bits')
def test_convert_from_cryptography_private_key(self): """ PKey.from_cryptography_key creates a proper private PKey. """ key = serialization.load_pem_private_key( intermediate_key_pem, None, backend ) pkey = PKey.from_cryptography_key(key) assert isinstance(pkey, PKey) assert pkey.bits() == key.key_size assert pkey._only_public is False assert pkey._initialized is True
def test_convert_from_cryptography_unsupported_type(self): """ PKey.from_cryptography_key raises TypeError with an unsupported type. """ key = serialization.load_pem_private_key( ec_private_key_pem, None, backend ) with pytest.raises(TypeError): PKey.from_cryptography_key(key)
def parse_issuer_cred(issuer_cred): """ Given an X509 PEM file in the form of a string, parses it into sections by the PEM delimiters of: -----BEGIN <label>----- and -----END <label>---- Confirms the sections can be decoded in the proxy credential order of: issuer cert, issuer private key, proxy chain of 0 or more certs . Returns the issuer cert and private key as loaded cryptography objects and the proxy chain as a potentially empty string. """ # get each section of the PEM file sections = re.findall( "-----BEGIN.*?-----.*?-----END.*?-----", issuer_cred, flags=re.DOTALL) try: issuer_cert = sections[0] issuer_private_key = sections[1] issuer_chain_certs = sections[2:] except IndexError: raise ValueError("Unable to parse PEM data in credentials, " "make sure the X.509 file is in PEM format and " "consists of the issuer cert, issuer private key, " "and proxy chain (if any) in that order.") # then validate that each section of data can be decoded as expected try: loaded_cert = x509.load_pem_x509_certificate( six.b(issuer_cert), default_backend()) loaded_private_key = serialization.load_pem_private_key( six.b(issuer_private_key), password=None, backend=default_backend()) for chain_cert in issuer_chain_certs: x509.load_pem_x509_certificate( six.b(chain_cert), default_backend()) issuer_chain = "".join(issuer_chain_certs) except ValueError: raise ValueError("Failed to decode PEM data in credentials. Make sure " "the X.509 file consists of the issuer cert, " "issuer private key, and proxy chain (if any) " "in that order.") # return loaded cryptography objects and the issuer chain return loaded_cert, loaded_private_key, issuer_chain
def _set_private_key_string(self, private_key_string): self._private_key = crypto_serialization.load_pem_private_key( self.ensure_bytes(private_key_string), password=None, backend=crypto_backend, ) self._public_key = self._private_key.public_key() # alias the most-preferred key wrapper class we have available as `Key`
def __load_cert(self, cert_dir): """???/??? ?? ? ?? ??? :param cert_dir: ???/??? ?? ?? :return: X509 ??? """ logging.debug("Cert/Key loading...") cert_file = join(cert_dir, "cert.pem") pri_file = join(cert_dir, "key.pem") f = open(cert_file, "rb") cert_bytes = f.read() f.close() cert = x509.load_pem_x509_certificate(cert_bytes, default_backend()) f = open(pri_file, "rb") pri_bytes = f.read() f.close() try: pri = serialization.load_pem_private_key(pri_bytes, self.__PASSWD, default_backend()) except ValueError: logging.debug("Invalid Password(%s)", cert_dir) return None data = b"test" signature = pri.sign(data, ec.ECDSA(hashes.SHA256())) try: pub_key = cert.public_key() result = pub_key.verify(signature, data, ec.ECDSA(hashes.SHA256())) except InvalidSignature: logging.debug("sign test fail") result = False if result: return cert else: logging.error("result is False ") return None
def convert_privatekey_from_pem(self, pri_pem, password): """PEM ???? ????? private_key ? ?? ??? ??? CA ???/??? None?? ?? :param pri_pem: PEM ??? :param password: ??? ??? ???? :return: private_key """ try: return serialization.load_pem_private_key(pri_pem, password, default_backend()) except ValueError: logging.debug("Invalid Password") self.__ca_cert = None self.__ca_pri = None return None
def _init_protected_mode(self, config): self.protected = 'CLOUDFRONT_KEY_ID' in config if self.protected: cloud_front_key = self._get_cloud_front_key(config) self.key_id = config['CLOUDFRONT_KEY_ID'] self.cloud_front_key = serialization.load_pem_private_key( cloud_front_key, password=None, backend=default_backend(), ) self.signer = CloudFrontSigner(self.key_id, self.rsa_signer)
def test_deserialization(self): """ Perform the deserialization test """ with open('tests/simple_private_key.pem', 'rb') as key_file: private_key = serialization.load_pem_private_key( key_file.read(), password=None, backend=default_backend() ) test_id = "stuffblah" public_numbers = private_key.public_key().public_numbers() server_address = start_server(public_numbers.n, public_numbers.e, test_id) print(server_address) issuer = "http://localhost:{}/".format(server_address[1]) token = scitokens.SciToken(key=private_key, key_id=test_id) token.update_claims({"test": "true"}) serialized_token = token.serialize(issuer=issuer) self.assertEqual(len(serialized_token.decode('utf8').split(".")), 3) scitoken = scitokens.SciToken.deserialize(serialized_token, insecure=True) self.assertIsInstance(scitoken, scitokens.SciToken) token = scitokens.SciToken(key=private_key, key_id="doesnotexist") serialized_token = token.serialize(issuer=issuer) with self.assertRaises(scitokens.utils.errors.MissingKeyException): scitoken = scitokens.SciToken.deserialize(serialized_token, insecure=True)
def _test_private(key): return serialization.load_pem_private_key( key, password=None, backend=default_backend() )
def load_privatekey(pkey_file): """ Load a private key """ with open(pkey_file, 'rb') as f: pkey = serialization.load_pem_private_key( data=f.read(), password=None, backend=default_backend() ) return pkey
def load(cls, path): with openfile(path, 'rb') as key_file: key = serialization.load_pem_private_key(key_file.read(), password=None, backend=cls.__backend__) return PrivateKey.new(key) if cls.__type__ is None else cls(key)
def loadPrivKey(pem): """ Creates a cryptography private key object from the given PEM private key. :param pem: A private key as a PEM string. :return: A cryptography private key object. """ return load_pem_private_key(pem.encode("utf-8"), None, default_backend())