我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用cryptography.x509.NameAttribute()。
def enroll(self, enrollment_id, enrollment_secret): """Enroll a registered user in order to receive a signed X509 certificate Args: enrollment_id (str): The registered ID to use for enrollment enrollment_secret (str): The secret associated with the enrollment ID Returns: PEM-encoded X509 certificate Raises: RequestException: errors in requests.exceptions ValueError: Failed response, json parse error, args missing """ private_key = self._crypto.generate_private_key() csr = self._crypto.generate_csr(private_key, x509.Name( [x509.NameAttribute(NameOID.COMMON_NAME, six.u(enrollment_id))])) cert = self._ca_client.enroll( enrollment_id, enrollment_secret, csr.public_bytes(Encoding.PEM).decode("utf-8")) return Enrollment(private_key, cert)
def generate_cert(key_path, cert_out_path): private_key = load_private_key_file(key_path) public_key = private_key.public_key() builder = x509.CertificateBuilder() builder = builder.subject_name(x509.Name([ x509.NameAttribute(x509.OID_COMMON_NAME, u'PrivCount User'), ])) builder = builder.issuer_name(x509.Name([ x509.NameAttribute(x509.OID_COMMON_NAME, u'PrivCount Authority'), ])) builder = builder.not_valid_before(datetime.datetime.today() - datetime.timedelta(days=1)) builder = builder.not_valid_after(datetime.datetime(2020, 1, 1)) builder = builder.serial_number(int(uuid.uuid4())) builder = builder.public_key(public_key) builder = builder.add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=True) certificate = builder.sign(private_key=private_key, algorithm=hashes.SHA256(), backend=default_backend()) with open(cert_out_path, 'wb') as outf: print >>outf, certificate.public_bytes(encoding=serialization.Encoding.PEM)
def _build_x509_name(self, x509_name): count = self._backend._lib.X509_NAME_entry_count(x509_name) attributes = [] for x in range(count): entry = self._backend._lib.X509_NAME_get_entry(x509_name, x) obj = self._backend._lib.X509_NAME_ENTRY_get_object(entry) assert obj != self._backend._ffi.NULL data = self._backend._lib.X509_NAME_ENTRY_get_data(entry) assert data != self._backend._ffi.NULL buf = self._backend._ffi.new("unsigned char **") res = self._backend._lib.ASN1_STRING_to_UTF8(buf, data) assert res >= 0 assert buf[0] != self._backend._ffi.NULL buf = self._backend._ffi.gc( buf, lambda buf: self._backend._lib.OPENSSL_free(buf[0]) ) value = self._backend._ffi.buffer(buf[0], res)[:].decode('utf8') oid = self._obj2txt(obj) attributes.append( x509.NameAttribute( x509.ObjectIdentifier(oid), value ) ) return x509.Name(attributes)
def create_csr(key, domains, must_staple=False): """ Creates a CSR in DER format for the specified key and domain names. """ assert domains name = x509.Name([ x509.NameAttribute(NameOID.COMMON_NAME, domains[0]), ]) san = x509.SubjectAlternativeName([x509.DNSName(domain) for domain in domains]) csr = x509.CertificateSigningRequestBuilder().subject_name(name) \ .add_extension(san, critical=False) if must_staple: ocsp_must_staple = x509.TLSFeature(features=[x509.TLSFeatureType.status_request]) csr = csr.add_extension(ocsp_must_staple, critical=False) csr = csr.sign(key, hashes.SHA256(), default_backend()) return export_csr_for_acme(csr)
def _create_self_signed_certificate(self, private_key): issuer = x509.Name([ x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"), x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"CA"), x509.NameAttribute(NameOID.LOCALITY_NAME, u"San Francisco"), x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"My Company"), x509.NameAttribute(NameOID.COMMON_NAME, u"Test Certificate"), ]) cert_builder = x509.CertificateBuilder( issuer_name=issuer, subject_name=issuer, public_key=private_key.public_key(), serial_number=x509.random_serial_number(), not_valid_before=datetime.utcnow(), not_valid_after=datetime.utcnow() + timedelta(days=10) ) cert = cert_builder.sign(private_key, hashes.SHA256(), default_backend()) return cert
def _check_or_add_cert(self, name, domain, key, authorization): if 'certificate' in domain: return domain['certificate'] self._log('domain:{}: generating CSR...', name) builder = x509.CertificateSigningRequestBuilder() builder = builder.subject_name(x509.Name([ x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, name), ])) csr = builder.sign(key, hashes.SHA256(), backend) self._log('domain:{}: done', name) self._log('domain:{}: requesting certificate...', name) certificate = self._client.new_certificate(self._key, csr) domain['certificate'] = certificate self._write_config() self._log('domain:{}: done: {}', name, certificate) return certificate
def generate_wildcard_pem_bytes(): """ Generate a wildcard (subject name '*') self-signed certificate valid for 10 years. https://cryptography.io/en/latest/x509/tutorial/#creating-a-self-signed-certificate :return: Bytes representation of the PEM certificate data """ key = generate_private_key(u'rsa') name = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, u'*')]) cert = ( x509.CertificateBuilder() .issuer_name(name) .subject_name(name) .not_valid_before(datetime.today() - timedelta(days=1)) .not_valid_after(datetime.now() + timedelta(days=3650)) .serial_number(int(uuid.uuid4())) .public_key(key.public_key()) .sign( private_key=key, algorithm=hashes.SHA256(), backend=default_backend()) ) return b''.join(( key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption()), cert.public_bytes(serialization.Encoding.PEM) ))
def test_ecies_generate_csr(self): """Test case for generate certificate signing request.""" ecies256 = ecies() private_key = ecies256.generate_private_key() csr = ecies256.generate_csr(private_key, x509.Name( [x509.NameAttribute(NameOID.COMMON_NAME, u"test")])) csr_pem = csr.public_bytes(Encoding.PEM) self.assertTrue(csr_pem.startswith( b"-----BEGIN CERTIFICATE REQUEST-----"))
def _decode_x509_name_entry(backend, x509_name_entry): obj = backend._lib.X509_NAME_ENTRY_get_object(x509_name_entry) backend.openssl_assert(obj != backend._ffi.NULL) data = backend._lib.X509_NAME_ENTRY_get_data(x509_name_entry) backend.openssl_assert(data != backend._ffi.NULL) value = _asn1_string_to_utf8(backend, data) oid = _obj2txt(backend, obj) return x509.NameAttribute(x509.ObjectIdentifier(oid), value)
def generate_self_signed_cert(cert_file, key_file): # type: (Any, Any) -> None """Given two file-like objects, generate an SSL key and certificate Args: cert_file: The certificate file you wish to write to key_file: The key file you wish to write to """ one_day = timedelta(1, 0, 0) private_key = rsa.generate_private_key( public_exponent=65537, key_size=2048, backend=default_backend()) public_key = private_key.public_key() builder = x509.CertificateBuilder() builder = builder.subject_name( x509.Name([ x509.NameAttribute(NameOID.COMMON_NAME, u'cryptography.io'), ])) builder = builder.issuer_name( x509.Name([ x509.NameAttribute(NameOID.COMMON_NAME, u'cryptography.io'), ])) builder = builder.not_valid_before(datetime.today() - one_day) builder = builder.not_valid_after(datetime.today() + timedelta(365 * 10)) builder = builder.serial_number(uuid4().int) builder = builder.public_key(public_key) builder = builder.add_extension( x509.BasicConstraints(ca=False, path_length=None), critical=True, ) certificate = builder.sign( private_key=private_key, algorithm=hashes.SHA256(), backend=default_backend()) key_file.write( private_key.private_bytes( Encoding.PEM, PrivateFormat.TraditionalOpenSSL, NoEncryption())) cert_file.write(certificate.public_bytes(Encoding.PEM))
def set_csr_if_blank(self): if not self.csr: private_key = self.get_key() builder = x509.CertificateSigningRequestBuilder() builder = builder.subject_name(x509.Name([ x509.NameAttribute(NameOID.COMMON_NAME, self.get_common_name()), x509.NameAttribute(NameOID.COUNTRY_NAME, u'{}'.format(self.account.country)), x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u'{}'.format(self.account.state)), x509.NameAttribute(NameOID.LOCALITY_NAME, u'{}'.format(self.account.locality)), x509.NameAttribute(NameOID.ORGANIZATION_NAME, u'{}'.format(self.account.organization_name)), x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u'{}'.format(self.account.organizational_unit_name)), ])) builder = builder.add_extension(x509.SubjectAlternativeName(self.get_san_entries()), critical=False) csr = builder.sign(private_key, hashes.SHA256(), default_backend()) self.csr = csr.public_bytes(serialization.Encoding.PEM)
def ca_file(tmpdir): """ Create a valid PEM file with CA certificates and return the path. """ key = rsa.generate_private_key( public_exponent=65537, key_size=2048, backend=default_backend() ) public_key = key.public_key() builder = x509.CertificateBuilder() builder = builder.subject_name(x509.Name([ x509.NameAttribute(NameOID.COMMON_NAME, u"pyopenssl.org"), ])) builder = builder.issuer_name(x509.Name([ x509.NameAttribute(NameOID.COMMON_NAME, u"pyopenssl.org"), ])) one_day = datetime.timedelta(1, 0, 0) builder = builder.not_valid_before(datetime.datetime.today() - one_day) builder = builder.not_valid_after(datetime.datetime.today() + one_day) builder = builder.serial_number(int(uuid.uuid4())) builder = builder.public_key(public_key) builder = builder.add_extension( x509.BasicConstraints(ca=True, path_length=None), critical=True, ) certificate = builder.sign( private_key=key, algorithm=hashes.SHA256(), backend=default_backend() ) ca_file = tmpdir.join("test.pem") ca_file.write_binary( certificate.public_bytes( encoding=serialization.Encoding.PEM, ) ) return str(ca_file).encode("ascii")
def create_ca_certificate(cn, key_size=4096, certify_days=365): key = rsa.generate_private_key(public_exponent=65537, key_size=key_size, backend=default_backend()) key_id = x509.SubjectKeyIdentifier.from_public_key(key.public_key()) subject = issuer = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, cn)]) now = datetime.datetime.utcnow() serial = x509.random_serial_number() cert = x509.CertificateBuilder() \ .subject_name(subject) \ .issuer_name(issuer) \ .public_key(key.public_key()) \ .serial_number(serial) \ .not_valid_before(now) \ .not_valid_after(now + datetime.timedelta(days=certify_days)) \ .add_extension(key_id, critical=False) \ .add_extension(x509.AuthorityKeyIdentifier(key_id.digest, [x509.DirectoryName(issuer)], serial), critical=False) \ .add_extension(x509.BasicConstraints(ca=True, path_length=0), critical=True) \ .add_extension(x509.KeyUsage(digital_signature=True, content_commitment=False, key_encipherment=False, data_encipherment=False, key_agreement=False, key_cert_sign=True, crl_sign=True, encipher_only=False, decipher_only=False), critical=True) \ .sign(key, hashes.SHA256(), default_backend()) cert = cert.public_bytes(serialization.Encoding.PEM) key = key.private_bytes(encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption()) return cert, key
def _decode_x509_name_entry(backend, x509_name_entry): obj = backend._lib.X509_NAME_ENTRY_get_object(x509_name_entry) backend.openssl_assert(obj != backend._ffi.NULL) data = backend._lib.X509_NAME_ENTRY_get_data(x509_name_entry) backend.openssl_assert(data != backend._ffi.NULL) value = backend._asn1_string_to_utf8(data) oid = _obj2txt(backend, obj) return x509.NameAttribute(x509.ObjectIdentifier(oid), value)
def dict_to_x509_name(data): name_attributes = [] attr_name_oid = { 'commonName': x509.NameOID.COMMON_NAME, 'countryName': x509.NameOID.COUNTRY_NAME, 'stateOrProvinceName': x509.NameOID.STATE_OR_PROVINCE_NAME, 'locality': x509.NameOID.LOCALITY_NAME, 'organizationName': x509.NameOID.ORGANIZATION_NAME, 'organizationalUnitName': x509.NameOID.ORGANIZATIONAL_UNIT_NAME, } for key, value in data.items(): if not key in attr_name_oid: raise ValueError('{} is not a supported x509 name attribute'.format(key)) name_attributes.append(x509.NameAttribute(attr_name_oid[key], value)) return x509.Name(name_attributes)
def makeSignedCert(cpub, ccn, cvdays, cserial, spriv, scert=None): """ Creates a certificate for a given public key and signs it with a given certificate and private key. It will reuse the subject of the signing certificate as the subject of the new certificate, only replacing the common name with the one given as parameter, if a signing certificate is specified, otherwise it will just use the given common name as subject and issuer. :param cpub: Public key for which to create a certificate. :param ccn: Common name for the new certificate. :param cvdays: Number of days the new certificate is valid. :param cserial: The serial number for the new certificate as an int. :param spriv: Private key for the signing certificate. :param scert: Certificate used to sign the new certificate, or None if no certificate is used. :return: The new certificate as an object. """ if scert: sname = x509.Name( [ p for p in scert.subject if p.oid != NameOID.COMMON_NAME ] + [ x509.NameAttribute(NameOID.COMMON_NAME, ccn) ]) iname = scert.subject else: sname = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, ccn)]) iname = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, ccn)]) builder = x509.CertificateBuilder() builder = builder.subject_name(sname) builder = builder.issuer_name(iname) builder = builder.not_valid_before(datetime.datetime.today()) builder = builder.not_valid_after(datetime.datetime.today() + datetime.timedelta(cvdays, 0, 0)) builder = builder.serial_number(cserial) builder = builder.public_key(cpub) builder = builder.add_extension( x509.BasicConstraints(ca=True, path_length=None), critical=True ) return builder.sign(private_key=spriv, algorithm=hashes.SHA256(), backend=default_backend())
def _create_oids(common_name, oids): attrs = [x509.NameAttribute(NameOID.COMMON_NAME, common_name)] for key in CFG.csr.oids: oid = OIDS_MAP.get(key, None) if oid: if key in oids: value = str(oids[key]) else: value = str(CFG.csr.oids[key]) attrs += [x509.NameAttribute(oid, value)] return attrs
def serialize(self, # password=None, country=u"US", state=u"CA", city=u"San Francisco", company=u"Lokey Examle", common_name=u"example.com"): # This should be handled already # if not password: # password = None key = serialization.load_pem_private_key( self.to('pem'), password=None, backend=default_backend()) subject = x509.Name([ x509.NameAttribute(NameOID.COUNTRY_NAME, country), x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, state), x509.NameAttribute(NameOID.LOCALITY_NAME, city), x509.NameAttribute(NameOID.ORGANIZATION_NAME, company), x509.NameAttribute(NameOID.COMMON_NAME, common_name), ]) cert = x509.CertificateSigningRequestBuilder().subject_name( subject ).sign(key, hashes.SHA256(), default_backend()) return cert.public_bytes(serialization.Encoding.PEM)
def _name(name): return x509.Name([ x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"trustme v{}".format(__version__)), x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, name), ])
def gen_certificate(key: rsa.RSAPrivateKey, common_name: str, *, issuer: Optional[str]=None, sign_key: Optional[rsa.RSAPrivateKey]=None) -> x509.Certificate: now = datetime.datetime.utcnow() return x509.CertificateBuilder().subject_name( x509.Name([ x509.NameAttribute(NameOID.COMMON_NAME, common_name), ]) ).issuer_name( x509.Name([ x509.NameAttribute( NameOID.COMMON_NAME, issuer or common_name ) ]) ).not_valid_before( now ).not_valid_after( now + datetime.timedelta(seconds=86400) ).serial_number( x509.random_serial_number() ).public_key( key.public_key() ).add_extension( x509.BasicConstraints(ca=True, path_length=0), critical=True ).sign( private_key=sign_key or key, algorithm=hashes.SHA256(), backend=BACKEND )
def MakeCASignedCert(common_name, private_key, ca_cert, ca_private_key, serial_number=2, session=None): """Make a cert and sign it with the CA's private key.""" public_key = private_key.public_key() builder = x509.CertificateBuilder() builder = builder.issuer_name(ca_cert.get_issuer()) subject = x509.Name([ x509.NameAttribute(oid.NameOID.COMMON_NAME, common_name) ]) builder = builder.subject_name(subject) valid_from = time.time() - 60 * 60 * 24 valid_until = time.time() + 60 * 60 * 24 * 365 * 10 builder = builder.not_valid_before(datetime.datetime.fromtimestamp( valid_from)) builder = builder.not_valid_after(datetime.datetime.fromtimestamp( valid_until)) builder = builder.serial_number(serial_number) builder = builder.public_key(public_key.get_raw_key()) builder = builder.add_extension( x509.BasicConstraints( ca=False, path_length=None), critical=True) certificate = builder.sign( private_key=ca_private_key.get_raw_key(), algorithm=hashes.SHA256(), backend=openssl.backend) return X509Ceritifcate(session=session).from_raw_key(certificate)
def _decode_x509_name_entry(backend, x509_name_entry): obj = backend._lib.X509_NAME_ENTRY_get_object(x509_name_entry) backend.openssl_assert(obj != backend._ffi.NULL) data = backend._lib.X509_NAME_ENTRY_get_data(x509_name_entry) backend.openssl_assert(data != backend._ffi.NULL) value = _asn1_string_to_utf8(backend, data) oid = _obj2txt(backend, obj) type = _ASN1_TYPE_TO_ENUM[data.type] return x509.NameAttribute(x509.ObjectIdentifier(oid), value, type)
def _generate_ca_cert(self): """ Generate a CA cert/key. """ if self._ca_key is None: self._ca_key = generate_private_key(u'rsa') self._ca_name = x509.Name([ x509.NameAttribute(NameOID.COMMON_NAME, u'ACME Snake Oil CA')]) self._ca_cert = ( x509.CertificateBuilder() .subject_name(self._ca_name) .issuer_name(self._ca_name) .not_valid_before(self._now() - timedelta(seconds=3600)) .not_valid_after(self._now() + timedelta(days=3650)) .public_key(self._ca_key.public_key()) .serial_number(int(uuid4())) .add_extension( x509.BasicConstraints(ca=True, path_length=0), critical=True) .add_extension( x509.SubjectKeyIdentifier.from_public_key( self._ca_key.public_key()), critical=False) .sign( private_key=self._ca_key, algorithm=hashes.SHA256(), backend=default_backend())) self._ca_aki = x509.AuthorityKeyIdentifier.from_issuer_public_key( self._ca_key.public_key())
def test_common_name_too_long(self): """ If the first name provided is too long, `~txacme.util.csr_for_names` uses a dummy value for the common name. """ self.assertThat( csr_for_names([u'aaaa.' * 16], RSA_KEY_512_RAW), MatchesStructure( subject=Equals(x509.Name([ x509.NameAttribute( NameOID.COMMON_NAME, u'san.too.long.invalid')]))))
def generate_tls_sni_01_cert(server_name, key_type=u'rsa', _generate_private_key=None): """ Generate a certificate/key pair for responding to a tls-sni-01 challenge. :param str server_name: The SAN the certificate should have. :param str key_type: The type of key to generate; usually not necessary. :rtype: ``Tuple[`~cryptography.x509.Certificate`, PrivateKey]`` :return: A tuple of the certificate and private key. """ key = (_generate_private_key or generate_private_key)(key_type) name = x509.Name([ x509.NameAttribute(NameOID.COMMON_NAME, u'acme.invalid')]) cert = ( x509.CertificateBuilder() .subject_name(name) .issuer_name(name) .not_valid_before(datetime.now() - timedelta(seconds=3600)) .not_valid_after(datetime.now() + timedelta(seconds=3600)) .serial_number(int(uuid.uuid4())) .public_key(key.public_key()) .add_extension( x509.SubjectAlternativeName([x509.DNSName(server_name)]), critical=False) .sign( private_key=key, algorithm=hashes.SHA256(), backend=default_backend()) ) return (cert, key)
def csr_for_names(names, key): """ Generate a certificate signing request for the given names and private key. .. seealso:: `acme.client.Client.request_issuance` .. seealso:: `generate_private_key` :param ``List[str]``: One or more names (subjectAltName) for which to request a certificate. :param key: A Cryptography private key object. :rtype: `cryptography.x509.CertificateSigningRequest` :return: The certificate request message. """ if len(names) == 0: raise ValueError('Must have at least one name') if len(names[0]) > 64: common_name = u'san.too.long.invalid' else: common_name = names[0] return ( x509.CertificateSigningRequestBuilder() .subject_name(x509.Name([ x509.NameAttribute(NameOID.COMMON_NAME, common_name)])) .add_extension( x509.SubjectAlternativeName(list(map(x509.DNSName, names))), critical=False) .sign(key, hashes.SHA256(), default_backend()))
def _attribDict2x509List(attribs): attribs = attribs or OrderedDict() attrib_list = [] for nice_name, values in attribs.items(): oid = OID_MAPPING[nice_name] if not isinstance(values, list): values = [values] for single_value in values: attrib_list.append(x509.NameAttribute(oid, single_value)) return attrib_list
def build_crl(): #from cryptography import x509 # from cryptography.hazmat.backends import default_backend #from cryptography.hazmat.primitives import hashes # from cryptography.hazmat.primitives.asymmetric import rsa #from cryptography.x509.oid import NameOID #import datetime ca=get_newest_ca() one_day = datetime.timedelta(1, 0, 0) builder = x509.CertificateRevocationListBuilder() builder = builder.issuer_name(x509.Name([ x509.NameAttribute(NameOID.COMMON_NAME,ca.common_name), ])) builder = builder.last_update(datetime.datetime.today()) builder = builder.next_update(datetime.datetime.today() + one_day) revoked_list=Certificate.objects.filter(issuer_serial_number=ca.serial_number,revoked=True) for revoked_cert in revoked_list: logger.debug("revoked serial_number: %s",revoked_cert.serial_number) revoked_cert = x509.RevokedCertificateBuilder().serial_number(int(revoked_cert.serial_number) ).revocation_date( datetime.datetime.today() ).build(default_backend()) builder = builder.add_revoked_certificate(revoked_cert) crl = builder.sign( private_key=loadPEMKey(keyStorePath(ca.serial_number)), algorithm=hashes.SHA256(), backend=default_backend() ) dataStream=crl.public_bytes(serialization.Encoding.PEM) return dataStream
def create_csr(key, domains): """ Creates a CSR in DER format for the specified key and domain names. """ assert domains name = x509.Name([ x509.NameAttribute(NameOID.COMMON_NAME, domains[0]), ]) san = x509.SubjectAlternativeName([x509.DNSName(domain) for domain in domains]) csr = x509.CertificateSigningRequestBuilder().subject_name(name) \ .add_extension(san, critical=False) \ .sign(key, hashes.SHA256(), default_backend()) return export_csr_for_acme(csr)
def keygen(pub_key, priv_key, user_id, priv_key_password=None): """Generate new private key and certificate RSA_SHA512_4096""" # Generate our key key = rsa.generate_private_key(public_exponent=65537, key_size=4096, backend=default_backend()) if priv_key_password: ea = serialization.BestAvailableEncryption(priv_key_password) else: ea = serialization.NoEncryption() # Write our key to disk for safe keeping with open(priv_key, "wb") as f: f.write(key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=ea, )) # Various details about who we are. For a self-signed certificate the # subject and issuer are always the same. subject = issuer = x509.Name([ x509.NameAttribute(NameOID.COUNTRY_NAME, "XX"), x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "XX"), x509.NameAttribute(NameOID.LOCALITY_NAME, "XX"), x509.NameAttribute(NameOID.ORGANIZATION_NAME, "I2P Anonymous Network"), x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, "I2P"), x509.NameAttribute(NameOID.COMMON_NAME, user_id), ]) cert = x509.CertificateBuilder() \ .subject_name(subject) \ .issuer_name(issuer) \ .public_key(key.public_key()) \ .not_valid_before(datetime.datetime.utcnow()) \ .not_valid_after( datetime.datetime.utcnow() + datetime.timedelta(days=365*10) ) \ .serial_number(random.randrange(1000000000, 2000000000)) \ .add_extension( x509.SubjectKeyIdentifier.from_public_key(key.public_key()), critical=False, ).sign(key, hashes.SHA512(), default_backend()) with open(pub_key, "wb") as f: f.write(cert.public_bytes(serialization.Encoding.PEM))
def create_proxy_cert(loaded_cert, loaded_private_key, loaded_public_key, lifetime_hours): """ Given cryptography objects for an issuing certificate, a public_key, a private_key, and an int for lifetime in hours, creates a proxy cert from the issuer and public key signed by the private key. """ builder = x509.CertificateBuilder() # create a serial number for the new proxy # Under RFC 3820 there are many ways to generate the serial number. However # making the number unpredictable has security benefits, e.g. it can make # this style of attack more difficult: # http://www.win.tue.nl/hashclash/rogue-ca serial = struct.unpack("<Q", os.urandom(8))[0] builder = builder.serial_number(serial) # set the new proxy as valid from now until lifetime_hours have passed builder = builder.not_valid_before(datetime.datetime.utcnow()) builder = builder.not_valid_after( datetime.datetime.utcnow() + datetime.timedelta(hours=lifetime_hours)) # set the public key of the new proxy to the given public key builder = builder.public_key(loaded_public_key) # set the issuer of the new cert to the subject of the issuing cert builder = builder.issuer_name(loaded_cert.subject) # set the new proxy's subject # append a CommonName to the new proxy's subject # with the serial as the value of the CN new_atribute = x509.NameAttribute( x509.oid.NameOID.COMMON_NAME, six.u(str(serial))) subject_attributes = list(loaded_cert.subject) subject_attributes.append(new_atribute) builder = builder.subject_name(x509.Name(subject_attributes)) # add proxyCertInfo extension to the new proxy (We opt not to add keyUsage) # For RFC proxies the effective usage is defined as the intersection # of the usage of each cert in the chain. See section 4.2 of RFC 3820. # the constants 'oid' and 'value' are gotten from # examining output from a call to the open ssl function: # X509V3_EXT_conf(NULL, ctx, name, value) # ctx set by X509V3_set_nconf(&ctx, NCONF_new(NULL)) # name = "proxyCertInfo" # value = "critical,language:Inherit all" oid = x509.ObjectIdentifier("1.3.6.1.5.5.7.1.14") value = b"0\x0c0\n\x06\x08+\x06\x01\x05\x05\x07\x15\x01" extension = x509.extensions.UnrecognizedExtension(oid, value) builder = builder.add_extension(extension, critical=True) # sign the new proxy with the issuer's private key new_certificate = builder.sign( private_key=loaded_private_key, algorithm=hashes.SHA256(), backend=default_backend()) # return the new proxy as a cryptography object return new_certificate
def generate_ca_cert(self, cn, ou, o, expire_period=None, password=None): """CA ??? ?? Peer ??? ?? ?? ???(ECC Key) :param cn: ?? CommonName :param ou: ?? OrganizationalUnitName :param o: ?? OrganizationName :param expire_period: ??? ????(year) :param password: ??? ??? ????(8?? ??) """ sign_pri_key = ec.generate_private_key(ec.SECP256K1(), default_backend()) sign_pub_key = sign_pri_key.public_key() subject_name = x509.Name([ x509.NameAttribute(NameOID.COMMON_NAME, cn), x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, ou), x509.NameAttribute(NameOID.ORGANIZATION_NAME, o), x509.NameAttribute(NameOID.COUNTRY_NAME, "kr") ]) serial_number = self.__LAST_CA_INDEX + 1 key_usage = x509.KeyUsage(digital_signature=True, content_commitment=False, key_encipherment=True, data_encipherment=False, key_agreement=False, key_cert_sign=True, crl_sign=False, encipher_only=False, decipher_only=False) if expire_period is None: expire_period = self.__ca_expired new_cert = self.__generate_cert(pub_key=sign_pub_key, subject_name=subject_name, issuer_name=subject_name, serial_number=serial_number, expire_period=expire_period, key_usage=key_usage, issuer_priv=sign_pri_key) cert_pem = new_cert.public_bytes(encoding=serialization.Encoding.PEM) if password is None: pri_pem = sign_pri_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() ) else: pri_pem = sign_pri_key.private_bytes( encoding=serialization.Encoding.DER, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.BestAvailableEncryption(password=password) ) self.__save(self.__CA_PATH, cert_pem, pri_pem) self.__LAST_CA_INDEX += 1 self.__show_certificate(new_cert)