我们从Python开源项目中,提取了以下22个代码示例,用于说明如何使用cryptography.x509.load_der_x509_certificate()。
def load_certificate(self, cert_name): # Load the raw certificate file data. path = os.path.join(self.cert_path, cert_name) with open(path, 'rb') as cert_file: data = cert_file.read() # Convert the raw certificate data into a certificate object, first # as a PEM-encoded certificate and, if that fails, then as a # DER-encoded certificate. If both fail, the certificate cannot be # loaded. try: return x509.load_pem_x509_certificate(data, default_backend()) except Exception: try: return x509.load_der_x509_certificate(data, default_backend()) except Exception: raise exception.SignatureVerificationError( "Failed to load certificate: %s" % path )
def verify_peer_token(self, peer_token, peer, peer_type): token_time = peer_token[:16] token_sign = peer_token[16:] current_date = int(datetime.datetime.now().timestamp() * 1000) token_date = int(token_time, 16) if current_date > token_date: return False date = bytes.fromhex(token_time) peer_info = b''.join([peer.peer_id.encode('utf-8'), peer.target.encode('utf-8'), peer.group_id.encode('utf-8')]) + bytes([peer_type]) peer_cert = x509.load_der_x509_certificate(bytes.fromhex(peer.cert), default_backend()) peer_pub = peer_cert.public_key().public_bytes(encoding=serialization.Encoding.DER, format=PublicFormat.SubjectPublicKeyInfo) token_bytes = peer_info + date + peer_pub logging.debug("TBS Token(V) : %s", token_bytes.hex()) signature = bytes.fromhex(token_sign) return self.verify_data_with_cert(cert=self.__ca_cert, data=token_bytes, signature=signature)
def load_certificate(cert_file=None, cert_bytes=None): if cert_file: with open(cert_file, 'rb') as f: data = f.read() else: data = cert_bytes if '-----BEGIN'.encode() in data: cert = x509.load_pem_x509_certificate( data=data, backend=default_backend() ) else: cert = x509.load_der_x509_certificate( data=data, backend=default_backend() ) return cert
def get_certificate(self, filename): """ Return a certificate object by giving the name in the apk file """ pkcs7message = self.get_file(filename) message, _ = decode(pkcs7message) cert = encode(message[1][3]) # Remove the first identifier· # byte 0 == identifier, skip # byte 1 == length. If byte1 & 0x80 > 1, we have long format # The length of to read bytes is then coded # in byte1 & 0x7F cert = cert[2 + (cert[1] & 0x7F) if cert[1] & 0x80 > 1 else 2:] certificate = x509.load_der_x509_certificate(cert, default_backend()) return certificate
def get_certificate_chain(self, certificate): cert = requests.get(certificate) if cert.status_code != 200: raise ClientError('Certificate fetch failed: {}'.format( cert.json()['detail'])) if 'up' not in cert.links: return chain_url = urllib.parse.urljoin(certificate, cert.links['up']['url']) chain = requests.get(chain_url) if chain.status_code != 200: raise ClientError('Certificate chain fetch failed: {}'.format( cert.json()['detail'])) return x509.load_der_x509_certificate(chain.content, backend)
def load_der_certificate(data): """ Loads a DER X.509 certificate. """ return x509.load_der_x509_certificate(data, default_backend())
def _ValidatePubkeyGeneric(self, signing_cert, digest_alg, payload, enc_digest): cert = x509.load_der_x509_certificate(der_encoder.encode(signing_cert), default_backend()) pubkey = cert.public_key() if isinstance(pubkey, RSAPublicKey): verifier = pubkey.verifier(enc_digest, padding.PKCS1v15(), cert.signature_hash_algorithm) else: verifier = pubkey.verifier(enc_digest, cert.signature_hash_algorithm) verifier.update(payload) try: verifier.verify() return True except: return False
def ValidateCertificateSignature(self, signed_cert, signing_cert): """Given a cert signed by another cert, validates the signature.""" # First the naive way -- note this does not check expiry / use etc. signed = x509.load_der_x509_certificate(der_encoder.encode(signed_cert), default_backend()) signing = x509.load_der_x509_certificate(der_encoder.encode(signing_cert), default_backend()) verifier = signing.public_key().verifier(signed.signature, padding.PKCS1v15(), signed.signature_hash_algorithm) verifier.update(signed.tbs_certificate_bytes) try: verifier.verify() except Exception as e: raise Asn1Error('1: Validation of cert signature failed: {}'.format(e))
def convert_x509cert(self, cert_bytes): """???? ???? x509 ???? ?? :param cert_bytes: ??? bytes :return: x509 ??? """ return x509.load_der_x509_certificate(cert_bytes, default_backend())
def verify_data_with_dercert(self, cert_der, data: bytes, signature: bytes) -> bool: """ ?? ? ??? ?? :param cert_der: ???(der bytes) :param data: ?? ?? :param signature: ?? :return: ?? ??(True/False) """ cert = x509.load_der_x509_certificate(cert_der, default_backend()) return self.verify_data_with_cert(cert=cert, data=data, signature=signature)
def verify_certificate_der(self, der_cert): """ ??? ?? :param der_cert: DER ??? ????? :return: ?? ?? """ cert = x509.load_der_x509_certificate(der_cert, default_backend()) return self.verify_certificate(cert)
def generate_peer_token(self, peer_sign, peer_cert, peer_id, peer_target, group_id, peer_type, rand_key, token_interval): peer_info = b''.join([peer_id.encode('utf-8'), peer_target.encode('utf-8'), group_id.encode('utf-8')]) + bytes([peer_type]) data = peer_info + rand_key cert = x509.load_der_x509_certificate(peer_cert, default_backend()) if self.verify_data_with_cert(cert=cert, data=data, signature=peer_sign): time = datetime.datetime.now() + datetime.timedelta(minutes=token_interval) date = int(time.timestamp() * 1000).to_bytes(length=8, byteorder='big') peer_pub = cert.public_key().public_bytes(encoding=serialization.Encoding.DER, format=PublicFormat.SubjectPublicKeyInfo) # token_bytes = peer_id || peer_target || group_id || peer_type || peer_pub token_bytes = peer_info + date + peer_pub logging.debug("TBS Token[%s]", token_bytes.hex()) # token = date || CA_Sign(token_bytes) signed_token = self.sign_data(token_bytes) token = b''.join([date, signed_token]).hex() return token else: logging.debug('?? ?? ?? ?? ??? ?? ??') return None
def get_certificate(context, signature_certificate_uuid): """Create the certificate object from the retrieved certificate data. :param context: the user context for authentication :param signature_certificate_uuid: the uuid to use to retrieve the certificate :returns: the certificate cryptography object :raises: SignatureVerificationError if the retrieval fails or the format is invalid """ keymgr_api = key_manager.API() try: # The certificate retrieved here is a castellan certificate object cert = keymgr_api.get(context, signature_certificate_uuid) except KeyManagerError as e: # The problem encountered may be backend-specific, since castellan # can use different backends. Rather than importing all possible # backends here, the generic "Exception" is used. msg = (_LE("Unable to retrieve certificate with ID %(id)s: %(e)s") % {'id': signature_certificate_uuid, 'e': encodeutils.exception_to_unicode(e)}) LOG.error(msg) raise exception.SignatureVerificationError( reason=_('Unable to retrieve certificate with ID: %s') % signature_certificate_uuid) if cert.format not in CERTIFICATE_FORMATS: raise exception.SignatureVerificationError( reason=_('Invalid certificate format: %s') % cert.format) if cert.format == X_509: # castellan always encodes certificates in DER format cert_data = cert.get_encoded() certificate = x509.load_der_x509_certificate(cert_data, default_backend()) # verify the certificate verify_certificate(certificate) return certificate
def get_certificate(self, certificate): cert = requests.get(certificate) if cert.status_code != 200: raise ClientError('Certificate fetch failed: {}'.format( cert.json()['detail'])) return x509.load_der_x509_certificate(cert.content, backend)
def get_certificate(context, signature_certificate_uuid): """Create the certificate object from the retrieved certificate data. :param context: the user context for authentication :param signature_certificate_uuid: the uuid to use to retrieve the certificate :returns: the certificate cryptography object :raises: SignatureVerificationError if the retrieval fails or the format is invalid """ keymgr_api = key_manager.API() try: # The certificate retrieved here is a castellan certificate object cert = keymgr_api.get(context, signature_certificate_uuid) except ManagedObjectNotFoundError as e: raise exception.SignatureVerificationError( reason=_('Certificate not found with ID: %s') % signature_certificate_uuid) except KeyManagerError as e: # The problem encountered may be backend-specific, since castellan # can use different backends. Rather than importing all possible # backends here, the generic "Exception" is used. msg = (_LE("Unable to retrieve certificate with ID %(id)s: %(e)s") % {'id': signature_certificate_uuid, 'e': encodeutils.exception_to_unicode(e)}) LOG.error(msg) raise exception.SignatureVerificationError( reason=_('Unable to retrieve certificate with ID: %s') % signature_certificate_uuid) if cert.format not in CERTIFICATE_FORMATS: raise exception.SignatureVerificationError( reason=_('Invalid certificate format: %s') % cert.format) if cert.format == X_509: # castellan always encodes certificates in DER format cert_data = cert.get_encoded() certificate = x509.load_der_x509_certificate(cert_data, default_backend()) return certificate
def _get_client_identity(self): certificate_data = self._connection.getpeercert(binary_form=True) try: certificate = x509.load_der_x509_certificate( certificate_data, backends.default_backend() ) except Exception: # This should never get raised "in theory," as the ssl socket # should fail to connect non-TLS connections before the session # gets created. This is a failsafe in case that protection fails. raise exceptions.PermissionDenied( "Failure loading the client certificate from the session " "connection. Could not retrieve client identity." ) try: extended_key_usage = certificate.extensions.get_extension_for_oid( x509.oid.ExtensionOID.EXTENDED_KEY_USAGE ).value except x509.ExtensionNotFound: raise exceptions.PermissionDenied( "The extended key usage extension is missing from the client " "certificate. Session client identity unavailable." ) if x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH in extended_key_usage: client_identities = certificate.subject.get_attributes_for_oid( x509.oid.NameOID.COMMON_NAME ) if len(client_identities) > 0: if len(client_identities) > 1: self._logger.warning( "Multiple client identities found. Using the first " "one processed." ) client_identity = client_identities[0].value self._logger.info( "Session client identity: {0}".format(client_identity) ) return client_identity else: raise exceptions.PermissionDenied( "The client certificate does not define a subject common " "name. Session client identity unavailable." ) raise exceptions.PermissionDenied( "The extended key usage extension is not marked for client " "authentication in the client certificate. Session client " "identity unavailable." )
def fetch_details(self, crtsh_ids): rows = self._engine.execute(""" SELECT c.id, c.certificate, array_agg(DISTINCT cc.ca_owner) FROM certificate c INNER JOIN ca_certificate cac ON c.issuer_ca_id = cac.ca_id INNER JOIN ccadb_certificate cc ON cac.certificate_id = cc.certificate_id WHERE c.id IN %s GROUP BY c.id, c.certificate """, [(tuple(crtsh_ids),)]).fetchall() details = [] for row in rows: cert = x509.load_der_x509_certificate( bytes(row[1]), default_backend() ) subject_cn = cert.subject.get_attributes_for_oid( x509.NameOID.COMMON_NAME ) issuer_cn = cert.issuer.get_attributes_for_oid( x509.NameOID.COMMON_NAME ) try: san = cert.extensions.get_extension_for_class( x509.SubjectAlternativeName ) except x509.ExtensionNotFound: san_domains = None else: san_domains = san.value.get_values_for_type(x509.DNSName) details.append(RawCertificateDetails( crtsh_id=row[0], common_name=", ".join(a.value for a in subject_cn) if subject_cn else None, san_dns_names=san_domains, ccadb_owners=[o for o in row[2] if o is not None], issuer_common_name=", ".join(a.value for a in issuer_cn) if issuer_cn else None, expiration_date=cert.not_valid_after, )) return details
def _issue_cert(self, client, server_name): """ Issue a new cert for a particular name. """ log.info( 'Requesting a certificate for {server_name!r}.', server_name=server_name) key = self._generate_key() objects = [ Key(key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption()))] def answer_and_poll(authzr): def got_challenge(stop_responding): return ( poll_until_valid(authzr, self._clock, client) .addBoth(tap(lambda _: stop_responding()))) return ( answer_challenge(authzr, client, self._responders) .addCallback(got_challenge)) def got_cert(certr): objects.append( Certificate( x509.load_der_x509_certificate( certr.body, default_backend()) .public_bytes(serialization.Encoding.PEM))) return certr def got_chain(chain): for certr in chain: got_cert(certr) log.info( 'Received certificate for {server_name!r}.', server_name=server_name) return objects return ( client.request_challenges(fqdn_identifier(server_name)) .addCallback(answer_and_poll) .addCallback(lambda ign: client.request_issuance( CertificateRequest( csr=csr_for_names([server_name], key)))) .addCallback(got_cert) .addCallback(client.fetch_chain) .addCallback(got_chain) .addCallback(partial(self.cert_store.store, server_name)))