我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用cryptography.hazmat.primitives.hashes.SHA256。
def encrypt(message, receiver_public_key): sender_private_key = ec.generate_private_key(ec.SECP256K1(), backend) shared_key = sender_private_key.exchange(ec.ECDH(), receiver_public_key) sender_public_key = sender_private_key.public_key() point = sender_public_key.public_numbers().encode_point() iv = '000000000000' xkdf = x963kdf.X963KDF( algorithm = hashes.SHA256(), length = 32, sharedinfo = '', backend = backend ) key = xkdf.derive(shared_key) encryptor = Cipher( algorithms.AES(key), modes.GCM(iv), backend = backend ).encryptor() ciphertext = encryptor.update(message) + encryptor.finalize() return point + encryptor.tag + ciphertext
def verify_hmac(expected_result, secret_key, unique_prefix, data): ''' Perform a HMAC using the secret key, unique hash prefix, and data, and verify that the result of: HMAC-SHA256(secret_key, unique_prefix | data) matches the bytes in expected_result. The key must be kept secret. The prefix ensures hash uniqueness. Returns True if the signature matches, and False if it does not. ''' # If the secret key is shorter than the digest size, security is reduced assert secret_key assert len(secret_key) >= CryptoHash.digest_size h = hmac.HMAC(bytes(secret_key), CryptoHash(), backend=default_backend()) h.update(bytes(unique_prefix)) h.update(bytes(data)) try: h.verify(bytes(expected_result)) return True except InvalidSignature: return False
def generate_cert(key_path, cert_out_path): private_key = load_private_key_file(key_path) public_key = private_key.public_key() builder = x509.CertificateBuilder() builder = builder.subject_name(x509.Name([ x509.NameAttribute(x509.OID_COMMON_NAME, u'PrivCount User'), ])) builder = builder.issuer_name(x509.Name([ x509.NameAttribute(x509.OID_COMMON_NAME, u'PrivCount Authority'), ])) builder = builder.not_valid_before(datetime.datetime.today() - datetime.timedelta(days=1)) builder = builder.not_valid_after(datetime.datetime(2020, 1, 1)) builder = builder.serial_number(int(uuid.uuid4())) builder = builder.public_key(public_key) builder = builder.add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=True) certificate = builder.sign(private_key=private_key, algorithm=hashes.SHA256(), backend=default_backend()) with open(cert_out_path, 'wb') as outf: print >>outf, certificate.public_bytes(encoding=serialization.Encoding.PEM)
def decrypt(message, receiver_private_key): point = message[0:65] tag = message[65:81] ciphertext = message[81:] sender_public_numbers = ec.EllipticCurvePublicNumbers.from_encoded_point(ec.SECP256K1(), point) sender_public_key = sender_public_numbers.public_key(backend) shared_key = receiver_private_key.exchange(ec.ECDH(), sender_public_key) iv = '000000000000' xkdf = x963kdf.X963KDF( algorithm = hashes.SHA256(), length = 32, sharedinfo = '', backend = backend ) key = xkdf.derive(shared_key) decryptor = Cipher( algorithms.AES(key), modes.GCM(iv,tag), backend = backend ).decryptor() message = decryptor.update(ciphertext) + decryptor.finalize() return message
def _encrypt_from_parts(self, data, current_time, iv): if not isinstance(data, bytes): raise TypeError("data must be bytes.") padder = padding.PKCS7(algorithms.AES.block_size).padder() padded_data = padder.update(data) + padder.finalize() encryptor = Cipher( algorithms.AES(self._encryption_key), modes.CBC(iv), self._backend ).encryptor() ciphertext = encryptor.update(padded_data) + encryptor.finalize() basic_parts = ( b"\x80" + struct.pack(">Q", current_time) + iv + ciphertext ) h = HMAC(self._signing_key, hashes.SHA256(), backend=self._backend) h.update(basic_parts) hmac = h.finalize() return base64.urlsafe_b64encode(basic_parts + hmac)
def __init__(self, key, length, algorithm, backend): if not isinstance(backend, HMACBackend): raise UnsupportedAlgorithm( "Backend object does not implement HMACBackend.", _Reasons.BACKEND_MISSING_INTERFACE ) if len(key) < 16: raise ValueError("Key length has to be at least 128 bits.") if not isinstance(length, six.integer_types): raise TypeError("Length parameter must be an integer type.") if length < 6 or length > 8: raise ValueError("Length of HOTP has to be between 6 to 8.") if not isinstance(algorithm, (SHA1, SHA256, SHA512)): raise TypeError("Algorithm must be SHA1, SHA256 or SHA512.") self._key = key self._length = length self._algorithm = algorithm self._backend = backend
def post(self): user = DB.session.query( User).filter(User.name == self.get_argument( "name")).one_or_none() if user: password = self.get_argument("password").encode('utf-8') kpdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=self.settings['secret'].encode('utf-8'), iterations=1000000, backend=password_backend ) try: kpdf.verify(password, user.password) self.set_secure_cookie("user", self.get_argument("name")) self.redirect("/dashboard") except InvalidKey: print(self.get_argument("password")) print(user.password) self.redirect("/login?status=wrongpassword") else: self.redirect("/login?status=usernotfound")
def sign(self, msg): """Create signature for message Taken from https://github.com/madeddie/python-bunq - Thanks! :param msg: data to be signed, usually action, headers and body :type msg: str """ return base64.b64encode( self.privkey_pem.sign( msg.encode(), padding.PKCS1v15(), hashes.SHA256() ) ).decode()
def kdf_rfc5869_derive(secret_input_bytes, output_len, m_expand=M_EXPAND_NTOR, t_key=T_KEY_NTOR): ''' Return output_len bytes generated from secret_input_bytes using RFC5869 with HKDF-SHA256. There is no equivalent verification function, as only the nonce part of the KDF result is verified directly. See https://gitweb.torproject.org/torspec.git/tree/tor-spec.txt#n1026 ''' hkdf_sha256 = hkdf.HKDF(algorithm=hashes.SHA256(), length=output_len, info=bytes(m_expand), salt=bytes(t_key), backend=backends.default_backend()) output_bytes = hkdf_sha256.derive(bytes(secret_input_bytes)) assert len(output_bytes) == output_len return bytearray(output_bytes)
def check_password(self, password): kpdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=SECRET, iterations=1000000, backend=password_backend ) try: kpdf.verify(password.encode('utf-8'), self.get_password()) correct = True except InvalidKey as e: print(e) correct = False return correct
def create_master(password): kpdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=SECRET, iterations=1000000, backend=password_backend ) master_user = User() master_user.name = 'master' master_user.when = datetime.datetime.now() master_user.set_permissions(Permissions.ROOT) master_user.set_password(kpdf.derive(password.encode('utf-8'))) DB.session.add(master_user) DB.session.commit()
def create_user(name, password): kpdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=SECRET, iterations=1000000, backend=password_backend ) user = User() user.name = name user.when = datetime.datetime.now() user.set_permissions(Permissions.USER) user.set_password(kpdf.derive(password.encode('utf-8'))) DB.session.add(user) DB.session.commit()
def __init__(self, key, length, algorithm, backend, enforce_key_length=True): if not isinstance(backend, HMACBackend): raise UnsupportedAlgorithm( "Backend object does not implement HMACBackend.", _Reasons.BACKEND_MISSING_INTERFACE ) if len(key) < 16 and enforce_key_length is True: raise ValueError("Key length has to be at least 128 bits.") if not isinstance(length, six.integer_types): raise TypeError("Length parameter must be an integer type.") if length < 6 or length > 8: raise ValueError("Length of HOTP has to be between 6 to 8.") if not isinstance(algorithm, (SHA1, SHA256, SHA512)): raise TypeError("Algorithm must be SHA1, SHA256 or SHA512.") self._key = key self._length = length self._algorithm = algorithm self._backend = backend
def sign_request(key, header, protected_header, payload): """ Creates a JSON Web Signature for the request header and payload using the specified account key. """ protected = jose_b64(json.dumps(protected_header).encode('utf8')) payload = jose_b64(json.dumps(payload).encode('utf8')) signer = key.signer(padding.PKCS1v15(), hashes.SHA256()) signer.update(protected.encode('ascii')) signer.update(b'.') signer.update(payload.encode('ascii')) return json.dumps({ 'header': header, 'protected': protected, 'payload': payload, 'signature': jose_b64(signer.finalize()), })
def create_csr(key, domains, must_staple=False): """ Creates a CSR in DER format for the specified key and domain names. """ assert domains name = x509.Name([ x509.NameAttribute(NameOID.COMMON_NAME, domains[0]), ]) san = x509.SubjectAlternativeName([x509.DNSName(domain) for domain in domains]) csr = x509.CertificateSigningRequestBuilder().subject_name(name) \ .add_extension(san, critical=False) if must_staple: ocsp_must_staple = x509.TLSFeature(features=[x509.TLSFeatureType.status_request]) csr = csr.add_extension(ocsp_must_staple, critical=False) csr = csr.sign(key, hashes.SHA256(), default_backend()) return export_csr_for_acme(csr)
def _gen_key_initctr(cls, b_password, b_salt): # 16 for AES 128, 32 for AES256 keylength = 32 # match the size used for counter.new to avoid extra work ivlength = 16 if HAS_PBKDF2HMAC: backend = default_backend() kdf = PBKDF2HMAC( algorithm=c_SHA256(), length=2 * keylength + ivlength, salt=b_salt, iterations=10000, backend=backend) b_derivedkey = kdf.derive(b_password) else: b_derivedkey = cls._create_key(b_password, b_salt, keylength, ivlength) b_key1 = b_derivedkey[:keylength] b_key2 = b_derivedkey[keylength:(keylength * 2)] b_iv = b_derivedkey[(keylength * 2):(keylength * 2) + ivlength] return b_key1, b_key2, hexlify(b_iv)
def kdf(key_material): """NIST SP 800-56a Concatenation Key Derivation Function (see section 5.8.1). Pretty much copied from geth's implementation: https://github.com/ethereum/go-ethereum/blob/673007d7aed1d2678ea3277eceb7b55dc29cf092/crypto/ecies/ecies.go#L167 """ key = b"" hash_blocksize = hashes.SHA256().block_size reps = ((KEY_LEN + 7) * 8) / (hash_blocksize * 8) counter = 0 while counter <= reps: counter += 1 ctx = sha256() ctx.update(struct.pack('>I', counter)) ctx.update(key_material) key += ctx.digest() return key[:KEY_LEN]
def sign_test(self): """???/??? ?? ? ?? ??? :return: ??? ??(True/False) """ if self.is_secure is False: logging.debug("CA is not secure_mode") return False data = b"test" signature = self.__ca_pri.sign( data, ec.ECDSA(hashes.SHA256()) ) try: pub_key = self.__ca_cert.public_key() return pub_key.verify( signature, data, ec.ECDSA(hashes.SHA256()) ) except InvalidSignature: logging.debug("cert test fail!!!") return False
def sign_data(self, data: bytes) -> bytes: """ CA ???? DATA ?? :param data: ?? ?? ?? :return: ?? """ if isinstance(self.__ca_pri, ec.EllipticCurvePrivateKeyWithSerialization): signer = self.__ca_pri.signer(ec.ECDSA(hashes.SHA256())) signer.update(data) return signer.finalize() elif isinstance(self.__ca_pri, rsa.RSAPrivateKeyWithSerialization): return self.__ca_pri.sign( data, padding.PKCS1v15(), hashes.SHA256() ) else: logging.debug("Unknown PrivateKey Type : %s", type(self.__ca_pri)) return None
def get_channel(self): #first get a token we need to sign in order to prove we are who we say we are r = requests.get(str(self.base_link_uri) + "/get_device_token", params={"UUID": self.uuid, }) token = r.json()["token"] # get the private Key with open(CloudLinkSettings.PRIVATE_KEY_LOCATION,'r') as key_file: private_key = serialization.load_pem_private_key(key_file.read(), password=CloudLinkSettings.PRIVATE_KEY_PASSPHRASE, backend=default_backend()) # sign the token with our private key signer = private_key.signer(padding.PKCS1v15(), hashes.SHA256()) signer.update(bytes(token)) signature = signer.finalize() # get the randomly assigned channel for my UUID r = requests.get(str(self.base_link_uri) + "/get_device_group", params={"UUID": self.uuid, "signature": b64encode(signature), "format": "PKCS1_v1_5"}) if r.ok: self.channel_uri = r.json()["channel"] elif r.status_code == 400: raise Exception("UUID or Token not registered with Cloud.") elif r.status_code == 403: raise Exception("Signature didn't verify correctly. Bad private key or signature.")
def load_or_create_crl(crl_file, ca_crt, pkey): if os.path.isfile(crl_file): with open(crl_file, 'rb') as f: crl = x509.load_pem_x509_crl( data=f.read(), backend=default_backend() ) else: crl = x509.CertificateRevocationListBuilder().issuer_name( ca_crt.subject ).last_update( datetime.datetime.utcnow() ).next_update( datetime.datetime.utcnow() + datetime.timedelta(days=365 * 10) ).sign( private_key=pkey, algorithm=hashes.SHA256(), backend=default_backend() ) with open(crl_file, 'wb') as f: f.write(crl.public_bytes( encoding=serialization.Encoding.PEM, )) return crl
def _user_cert_validation(cert_str): """Prompt user for validation of certification from cluster :param cert_str: cluster certificate bundle :type cert_str: str :returns whether or not user validated cert :rtype: bool """ cert = x509.load_pem_x509_certificate( cert_str.encode('utf-8'), default_backend()) fingerprint = cert.fingerprint(hashes.SHA256()) pp_fingerprint = ":".join("{:02x}".format(c) for c in fingerprint).upper() msg = "SHA256 fingerprint of cluster certificate bundle:\n{}".format( pp_fingerprint) return confirm(msg, False)
def __init__(self): self._private_key = rsa.generate_private_key( public_exponent=65537, key_size=_KEY_SIZE, backend=default_backend() ) name = _name(u"Testing CA #" + random_text()) self._certificate = ( _cert_builder_common(name, name, self._private_key.public_key()) .add_extension( x509.BasicConstraints(ca=True, path_length=9), critical=True, ) .sign( private_key=self._private_key, algorithm=hashes.SHA256(), backend=default_backend(), ) ) self.cert_pem = Blob(self._certificate.public_bytes(Encoding.PEM))
def show_Certificate(cert, short=False): """ Print Fingerprints, Issuer and Subject of an X509 Certificate. :param cert: X509 Certificate to print :param short: Print in shortform for DN (Default: False) :type cert: :class:`cryptography.x509.Certificate` :type short: Boolean """ for h in [hashes.MD5, hashes.SHA1, hashes.SHA256, hashes.SHA512]: print("{}: {}".format(h.name, binascii.hexlify(cert.fingerprint(h())).decode("ascii"))) print("Issuer: {}".format(get_Name(cert.issuer, short=short))) print("Subject: {}".format(get_Name(cert.subject, short=short))) ################################## AXML FORMAT ######################################## # Translated from # http://code.google.com/p/android4me/source/browse/src/android/content/res/AXmlResourceParser.java
def encrypt_key_with_public_key(secret_key, public_encryption_key): """ Encrypts the given secret key with the public key. :param bytes secret_key: the key to encrypt :param public_encryption_key: the public encryption key :type public_encryption_key: :class:`~rsa.RSAPublicKey` :return: the encrypted key :rtype: bytes """ return public_encryption_key.encrypt( secret_key, padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None))
def decrypt_with_private_key(secret_key, private_encryption_key): """ Decrypts the given secret key with the private key. :param bytes secret_key: the secret key to decrypt :param private_encryption_key: the private encryption key :type private_encryption_key: :class:`~rsa.RSAPrivateKey` :return: the decrypted key :rtype: bytes """ return private_encryption_key.decrypt( secret_key, padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None))
def __init__(self, curve_class, nist_name): self.nist_name = nist_name self.key_length = curve_class.key_size # Defined in RFC 5656 6.2 self.key_format_identifier = "ecdsa-sha2-" + self.nist_name # Defined in RFC 5656 6.2.1 if self.key_length <= 256: self.hash_object = hashes.SHA256 elif self.key_length <= 384: self.hash_object = hashes.SHA384 else: self.hash_object = hashes.SHA512 self.curve_class = curve_class
def rsa(public_key, signature, message): """Verifies an RSA signature. Args: public_key (str): Public key with BEGIN and END sections. signature (str): Hex value of the signature with its leading 0x stripped. message (str): Message that was signed, unhashed. """ try: public_rsa = load_pem_public_key(bytes(public_key), backend=default_backend()) hashed = util.sha256(message) public_rsa.verify( binascii.unhexlify(signature), hashed, padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH ), hashes.SHA256() ) except InvalidSignature: raise Exception('Invalid signature')