我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用OpenSSL.crypto.X509Extension()。
def create_ca(): pkey = crypto.PKey() pkey.generate_key(crypto.TYPE_RSA, 2048) ca = crypto.X509() ca.set_version(2) ca.set_serial_number(0) subject = ca.get_subject() subject.countryName = 'CN' subject.stateOrProvinceName = 'Internet' subject.localityName = 'Cernet' subject.organizationName = ca_vendor subject.organizationalUnitName = '%s Root' % ca_vendor subject.commonName = '%s CA' % ca_vendor #???????????????????? ca.gmtime_adj_notBefore(ca_time_b) ca.gmtime_adj_notAfter(ca_time_a) ca.set_issuer(subject) ca.set_pubkey(pkey) ca.add_extensions([ crypto.X509Extension(b'basicConstraints', True, b'CA:TRUE, pathlen:0'), crypto.X509Extension(b'extendedKeyUsage', True, b'serverAuth,emailProtection,timeStamping'), crypto.X509Extension(b'keyUsage', False, b'keyCertSign, cRLSign'), crypto.X509Extension(b'subjectKeyIdentifier', False, b'hash', subject=ca), ]) ca.sign(pkey, ca_digest) return pkey, ca
def test_construction(self): """ L{X509Extension} accepts an extension type name, a critical flag, and an extension value and returns an L{X509ExtensionType} instance. """ basic = X509Extension('basicConstraints', True, 'CA:true') self.assertTrue( isinstance(basic, X509ExtensionType), "%r is of type %r, should be %r" % ( basic, type(basic), X509ExtensionType)) comment = X509Extension('nsComment', False, 'pyOpenSSL unit test') self.assertTrue( isinstance(comment, X509ExtensionType), "%r is of type %r, should be %r" % ( comment, type(comment), X509ExtensionType))
def test_invalid_extension(self): """ L{X509Extension} raises something if it is passed a bad extension name or value. """ self.assertRaises( Error, X509Extension, 'thisIsMadeUp', False, 'hi') self.assertRaises( Error, X509Extension, 'basicConstraints', False, 'blah blah') # Exercise a weird one (an extension which uses the r2i method). This # exercises the codepath that requires a non-NULL ctx to be passed to # X509V3_EXT_nconf. It can't work now because we provide no # configuration database. It might be made to work in the future. self.assertRaises( Error, X509Extension, 'proxyCertInfo', True, 'language:id-ppl-anyLanguage,pathlen:1,policy:text:AB')
def create_group_cert(cli): k = crypto.PKey() k.generate_key(crypto.TYPE_RSA, 2048) # generate RSA key-pair cert = crypto.X509() cert.get_subject().countryName = "US" cert.get_subject().stateOrProvinceName = "CA" cert.get_subject().organizationName = "mini-fulfillment" cert.get_subject().organizationalUnitName = "demo" cert.get_subject().commonName = "mini-fulfillment" cert.set_serial_number(1000) cert.gmtime_adj_notBefore(0) cert.gmtime_adj_notAfter(5 * 365 * 24 * 60 * 60) # 5 year expiry date cert.set_issuer(cert.get_subject()) # self-sign this certificate cert.set_pubkey(k) san_list = ["IP:{0}".format(cli.ip_address)] extension_list = [ crypto.X509Extension(type_name=b"basicConstraints", critical=False, value=b"CA:false"), crypto.X509Extension(type_name=b"subjectAltName", critical=True, value=", ".join(san_list)), # crypto.X509Extension(type_name=b"subjectKeyIdentifier", # critical=True, value=b"hash") ] cert.add_extensions(extension_list) cert.sign(k, 'sha256') prefix = str(cli.out_dir) + '/' + cli.group_name open("{0}-server.crt".format(prefix), 'wt').write( crypto.dump_certificate(crypto.FILETYPE_PEM, cert)) open("{0}-server-private.key".format(prefix), 'wt').write( crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey=k)) open("{0}-server-public.key".format(prefix), 'wt').write( crypto.dump_publickey(crypto.FILETYPE_PEM, pkey=k))
def test_type(self): """ L{X509Extension} and L{X509ExtensionType} refer to the same type object and can be used to create instances of that type. """ self.assertIdentical(X509Extension, X509ExtensionType) self.assertConsistentType( X509Extension, 'X509Extension', 'basicConstraints', True, 'CA:true')
def test_get_critical(self): """ L{X509ExtensionType.get_critical} returns the value of the extension's critical flag. """ ext = X509Extension('basicConstraints', True, 'CA:true') self.assertTrue(ext.get_critical()) ext = X509Extension('basicConstraints', False, 'CA:true') self.assertFalse(ext.get_critical())
def test_get_short_name(self): """ L{X509ExtensionType.get_short_name} returns a string giving the short type name of the extension. """ ext = X509Extension('basicConstraints', True, 'CA:true') self.assertEqual(ext.get_short_name(), 'basicConstraints') ext = X509Extension('nsComment', True, 'foo bar') self.assertEqual(ext.get_short_name(), 'nsComment')
def test_subject(self): """ If an extension requires a subject, the C{subject} parameter to L{X509Extension} provides its value. """ ext3 = X509Extension('subjectKeyIdentifier', False, 'hash', subject=self.x509) self.x509.add_extensions([ext3]) self.x509.sign(self.pkey, 'sha1') text = dump_certificate(FILETYPE_TEXT, self.x509) self.assertTrue('X509v3 Subject Key Identifier:' in text)
def test_missing_subject(self): """ If an extension requires a subject and the C{subject} parameter is given no value, something happens. """ self.assertRaises( Error, X509Extension, 'subjectKeyIdentifier', False, 'hash')
def test_invalid_subject(self): """ If the C{subject} parameter is given a value which is not an L{X509} instance, L{TypeError} is raised. """ for badObj in [True, object(), "hello", [], self]: self.assertRaises( TypeError, X509Extension, 'basicConstraints', False, 'CA:TRUE', subject=badObj)
def test_unused_issuer(self): """ The C{issuer} parameter to L{X509Extension} may be provided for an extension which does not use it and is ignored in this case. """ ext1 = X509Extension('basicConstraints', False, 'CA:TRUE', issuer=self.x509) self.x509.add_extensions([ext1]) self.x509.sign(self.pkey, 'sha1') text = dump_certificate(FILETYPE_TEXT, self.x509) self.assertTrue('X509v3 Basic Constraints:' in text) self.assertTrue('CA:TRUE' in text)
def test_issuer(self): """ If an extension requires a issuer, the C{issuer} parameter to L{X509Extension} provides its value. """ ext2 = X509Extension( 'authorityKeyIdentifier', False, 'issuer:always', issuer=self.x509) self.x509.add_extensions([ext2]) self.x509.sign(self.pkey, 'sha1') text = dump_certificate(FILETYPE_TEXT, self.x509) self.assertTrue('X509v3 Authority Key Identifier:' in text) self.assertTrue('DirName:/CN=Yoda root CA' in text)
def test_invalid_issuer(self): """ If the C{issuer} parameter is given a value which is not an L{X509} instance, L{TypeError} is raised. """ for badObj in [True, object(), "hello", [], self]: self.assertRaises( TypeError, X509Extension, 'authorityKeyIdentifier', False, 'keyid:always,issuer:always', issuer=badObj)
def generate(self, module): '''Generate the certificate signing request.''' if not self.check(module, perms_required=False) or self.force: req = crypto.X509Req() req.set_version(self.version) subject = req.get_subject() for (key, value) in self.subject.items(): if value is not None: setattr(subject, key, value) altnames = ', '.join(self.subjectAltName) extensions = [crypto.X509Extension(b"subjectAltName", False, altnames.encode('ascii'))] if self.keyUsage: usages = ', '.join(self.keyUsage) extensions.append(crypto.X509Extension(b"keyUsage", False, usages.encode('ascii'))) if self.extendedKeyUsage: usages = ', '.join(self.extendedKeyUsage) extensions.append(crypto.X509Extension(b"extendedKeyUsage", False, usages.encode('ascii'))) req.add_extensions(extensions) req.set_pubkey(self.privatekey) req.sign(self.privatekey, self.digest) self.request = req try: csr_file = open(self.path, 'wb') csr_file.write(crypto.dump_certificate_request(crypto.FILETYPE_PEM, self.request)) csr_file.close() except (IOError, OSError) as exc: raise CertificateSigningRequestError(exc) self.changed = True file_args = module.load_file_common_arguments(module.params) if module.set_fs_attributes_if_different(file_args, False): self.changed = True
def test_subject_key_identifier(self): ca = self._create_ca() e = ca.x509.get_extension(2) self.assertEqual(e.get_short_name().decode(), 'subjectKeyIdentifier') self.assertEqual(e.get_critical(), False) e2 = crypto.X509Extension(b'subjectKeyIdentifier', False, b'hash', subject=ca.x509) self.assertEqual(e.get_data(), e2.get_data())
def test_authority_key_identifier(self): ca = self._create_ca() e = ca.x509.get_extension(3) self.assertEqual(e.get_short_name().decode(), 'authorityKeyIdentifier') self.assertEqual(e.get_critical(), False) e2 = crypto.X509Extension(b'authorityKeyIdentifier', False, b'keyid:always,issuer:always', issuer=ca.x509) self.assertEqual(e.get_data(), e2.get_data())
def test_subject_key_identifier(self): cert = self._create_cert() e = cert.x509.get_extension(2) self.assertEqual(e.get_short_name().decode(), 'subjectKeyIdentifier') self.assertEqual(e.get_critical(), False) e2 = crypto.X509Extension(b'subjectKeyIdentifier', False, b'hash', subject=cert.x509) self.assertEqual(e.get_data(), e2.get_data())
def test_authority_key_identifier(self): cert = self._create_cert() e = cert.x509.get_extension(3) self.assertEqual(e.get_short_name().decode(), 'authorityKeyIdentifier') self.assertEqual(e.get_critical(), False) e2 = crypto.X509Extension(b'authorityKeyIdentifier', False, b'keyid:always,issuer:always', issuer=cert.ca.x509) self.assertEqual(e.get_data(), e2.get_data())
def _add_extensions(self, cert, is_server=False): """ (internal use only) adds x509 extensions to ``cert`` """ ext = list() ext.append(crypto.X509Extension(b'basicConstraints', True, b'CA:FALSE')) ext.append(crypto.X509Extension(b'keyUsage', CERT_KEYUSAGE_CRITICAL, bytes_compat(CERT_KEYUSAGE_VALUE))) if is_server: ext.append(crypto.X509Extension(b'extendedKeyUsage', False, b'serverAuth')) issuer_cert = self.ca_cert ext.append(crypto.X509Extension(b'subjectKeyIdentifier', False, b'hash', subject=cert)) cert.add_extensions(ext) cert.add_extensions([ crypto.X509Extension(b'authorityKeyIdentifier', False, b'keyid:always,issuer:always', issuer=issuer_cert) ]) return cert
def self_signed_cert_gen( key_type=crypto.TYPE_RSA, key_bits=4096, country="US", state_province="California", locality="San Francisco", org="Your Company", org_unit="Team", common_name="www.domain.com", subject_alt_names=[], # alternative dns names as list # ^ must look like: ["DNS:*.domain.com", "DNS:domain.ym"] validity_days=10 * 365): # Create a key pair k = crypto.PKey() k.generate_key(key_type, key_bits) # Create a self-signed cert cert = crypto.X509() cert.get_subject().C = country cert.get_subject().ST = state_province cert.get_subject().L = locality cert.get_subject().O = org cert.get_subject().OU = org_unit cert.get_subject().CN = common_name if subject_alt_names: subject_alt_names = ", ".join(subject_alt_names).encode("utf-8") cert.add_extensions([ crypto.X509Extension("subjectAltName".encode("utf-8"), False, subject_alt_names) ]) cert.set_serial_number(random.getrandbits(64)) cert.gmtime_adj_notBefore(0) cert.gmtime_adj_notAfter(validity_days * 24 * 60 * 60) cert.set_issuer(cert.get_subject()) # self-signer cert.set_pubkey(k) cert.sign(k, 'sha1') # return a tuple of the private key and the self-signed cert return (crypto.dump_privatekey(crypto.FILETYPE_PEM, k), crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
def _get_cert_extensions_ip_sans(self, user_name, node_name): extensions = [] if 'signer' in user_name.lower(): if ('peer' in node_name.lower() or 'orderer' in node_name.lower()): san_list = ["DNS:{0}".format(node_name)] extensions.append(crypto.X509Extension(b"subjectAltName", False, ", ".join(san_list))) return extensions
def __init__(self): # CA key key = crypto.PKey() key.generate_key(crypto.TYPE_RSA, 2048) # CA cert cert = crypto.X509() cert.set_serial_number(self.__next_serial) cert.set_version(2) cert.set_pubkey(key) cert.gmtime_adj_notBefore(0) cert.gmtime_adj_notAfter(10*365*24*60*60) cacert_subject = cert.get_subject() cacert_subject.O = 'kOVHernetes' cacert_subject.OU = 'kOVHernetes Certificate Authority' cacert_subject.CN = 'kOVHernetes Root CA' cert.set_issuer(cacert_subject) cert.add_extensions((crypto.X509Extension(b'subjectKeyIdentifier', False, b'hash', cert),)) cacert_ext = [] cacert_ext.append(crypto.X509Extension(b'authorityKeyIdentifier', True, b'keyid:always,issuer', issuer=cert)) cacert_ext.append(crypto.X509Extension(b'basicConstraints', True, b'CA:TRUE')) cacert_ext.append(crypto.X509Extension(b'keyUsage', True, b'digitalSignature, cRLSign, keyCertSign')) cert.add_extensions(cacert_ext) # sign CA cert with CA key cert.sign(key, 'sha256') type(self).__next_serial += 1 self.cert = cert self.key = key
def create_client_cert(self, key, o, cn): """Issue a X.509 client certificate""" cert = crypto.X509() cert.set_serial_number(self.__next_serial) cert.set_version(2) cert.set_pubkey(key) cert.gmtime_adj_notBefore(0) cert.gmtime_adj_notAfter(365*24*60*60) cert_subject = cert.get_subject() cert_subject.O = o cert_subject.OU = 'kOVHernetes' cert_subject.CN = cn cert.set_issuer(self.cert.get_issuer()) cert_ext = [] cert_ext.append(crypto.X509Extension(b'subjectKeyIdentifier', False, b'hash', cert)) cert_ext.append(crypto.X509Extension(b'authorityKeyIdentifier', False, b'keyid,issuer', issuer=cert)) cert_ext.append(crypto.X509Extension(b'basicConstraints', False, b'CA:FALSE')) cert_ext.append(crypto.X509Extension(b'keyUsage', True, b'nonRepudiation, digitalSignature, keyEncipherment')) cert_ext.append(crypto.X509Extension(b'extendedKeyUsage', True, b'clientAuth')) cert.add_extensions(cert_ext) # sign cert with CA key cert.sign(self.key, 'sha256') type(self).__next_serial += 1 return cert
def create_server_cert(self, key, o, cn, san=[]): """Issue a X.509 server certificate""" cert = crypto.X509() cert.set_serial_number(self.__next_serial) cert.set_version(2) cert.set_pubkey(key) cert.gmtime_adj_notBefore(0) cert.gmtime_adj_notAfter(365*24*60*60) cert_subject = cert.get_subject() cert_subject.O = o cert_subject.OU = 'kOVHernetes' cert_subject.CN = cn cert.set_issuer(self.cert.get_issuer()) cert_ext = [] cert_ext.append(crypto.X509Extension(b'subjectKeyIdentifier', False, b'hash', cert)) cert_ext.append(crypto.X509Extension(b'authorityKeyIdentifier', False, b'keyid,issuer:always', issuer=cert)) cert_ext.append(crypto.X509Extension(b'basicConstraints', False, b'CA:FALSE')) cert_ext.append(crypto.X509Extension(b'keyUsage', True, b'digitalSignature, keyEncipherment')) cert_ext.append(crypto.X509Extension(b'extendedKeyUsage', True, b'serverAuth')) if san: cert_ext.append(crypto.X509Extension(b'subjectAltName', False, ','.join(san).encode())) cert.add_extensions(cert_ext) # sign cert with CA key cert.sign(self.key, 'sha256') type(self).__next_serial += 1 return cert
def create_client_pair(self, o, cn): """Issue a X.509 client key/certificate pair""" # key key = crypto.PKey() key.generate_key(crypto.TYPE_RSA, 2048) # cert cert = crypto.X509() cert.set_serial_number(self.__next_serial) cert.set_version(2) cert.set_pubkey(key) cert.gmtime_adj_notBefore(0) cert.gmtime_adj_notAfter(365*24*60*60) cert_subject = cert.get_subject() cert_subject.O = o cert_subject.OU = 'kOVHernetes' cert_subject.CN = cn cert.set_issuer(self.cert.get_issuer()) cert_ext = [] cert_ext.append(crypto.X509Extension(b'subjectKeyIdentifier', False, b'hash', cert)) cert_ext.append(crypto.X509Extension(b'authorityKeyIdentifier', False, b'keyid,issuer', issuer=cert)) cert_ext.append(crypto.X509Extension(b'basicConstraints', False, b'CA:FALSE')) cert_ext.append(crypto.X509Extension(b'keyUsage', True, b'nonRepudiation, digitalSignature, keyEncipherment')) cert_ext.append(crypto.X509Extension(b'extendedKeyUsage', True, b'clientAuth')) cert.add_extensions(cert_ext) # sign cert with CA key cert.sign(self.key, 'sha256') type(self).__next_serial += 1 return key, cert
def create_server_pair(self, o, cn, san=[]): """Issue a X.509 server key/certificate pair""" # key key = crypto.PKey() key.generate_key(crypto.TYPE_RSA, 2048) # cert cert = crypto.X509() cert.set_serial_number(self.__next_serial) cert.set_version(2) cert.set_pubkey(key) cert.gmtime_adj_notBefore(0) cert.gmtime_adj_notAfter(365*24*60*60) cert_subject = cert.get_subject() cert_subject.O = o cert_subject.OU = 'kOVHernetes' cert_subject.CN = cn cert.set_issuer(self.cert.get_issuer()) cert_ext = [] cert_ext.append(crypto.X509Extension(b'subjectKeyIdentifier', False, b'hash', cert)) cert_ext.append(crypto.X509Extension(b'authorityKeyIdentifier', False, b'keyid,issuer:always', issuer=cert)) cert_ext.append(crypto.X509Extension(b'basicConstraints', False, b'CA:FALSE')) cert_ext.append(crypto.X509Extension(b'keyUsage', True, b'digitalSignature, keyEncipherment')) cert_ext.append(crypto.X509Extension(b'extendedKeyUsage', True, b'serverAuth')) if san: cert_ext.append(crypto.X509Extension(b'subjectAltName', False, ','.join(san).encode())) cert.add_extensions(cert_ext) # sign cert with CA key cert.sign(self.key, 'sha256') type(self).__next_serial += 1 return key, cert
def generate(self, module): '''Generate the certificate signing request.''' if not os.path.exists(self.path) or self.force: req = crypto.X509Req() req.set_version(self.version) subject = req.get_subject() for (key, value) in self.subject.items(): if value is not None: setattr(subject, key, value) if self.subjectAltName is not None: req.add_extensions([crypto.X509Extension(b"subjectAltName", False, self.subjectAltName.encode('ascii'))]) privatekey_content = open(self.privatekey_path).read() self.privatekey = crypto.load_privatekey(crypto.FILETYPE_PEM, privatekey_content) req.set_pubkey(self.privatekey) req.sign(self.privatekey, self.digest) self.request = req try: csr_file = open(self.path, 'wb') csr_file.write(crypto.dump_certificate_request(crypto.FILETYPE_PEM, self.request)) csr_file.close() except (IOError, OSError) as exc: raise CertificateSigningRequestError(exc) else: self.changed = False file_args = module.load_file_common_arguments(module.params) if module.set_fs_attributes_if_different(file_args, False): self.changed = True
def _create_test_cert(cert_file, key_file, subject, valid_days, serial_number): # create a key pair k = crypto.PKey() k.generate_key(crypto.TYPE_RSA, 2046) # create a self-signed cert with some basic constraints cert = crypto.X509() cert.get_subject().CN = subject cert.gmtime_adj_notBefore(-1 * 24 * 60 * 60) cert.gmtime_adj_notAfter(valid_days * 24 * 60 * 60) cert.set_version(2) cert.set_serial_number(serial_number) cert.add_extensions([ crypto.X509Extension(b"basicConstraints", True, b"CA:TRUE, pathlen:1"), crypto.X509Extension(b"subjectKeyIdentifier", False, b"hash", subject=cert), ]) cert.add_extensions([ crypto.X509Extension(b"authorityKeyIdentifier", False, b"keyid:always", issuer=cert) ]) cert.set_issuer(cert.get_subject()) cert.set_pubkey(k) cert.sign(k, 'sha256') cert_str = crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode('ascii') key_str = crypto.dump_privatekey(crypto.FILETYPE_PEM, k).decode('ascii') open(cert_file, 'w').write(cert_str) open(key_file, 'w').write(key_str)
def _create_verification_cert(cert_file, key_file, verification_file, nonce, valid_days, serial_number): if exists(cert_file) and exists(key_file): # create a key pair public_key = crypto.PKey() public_key.generate_key(crypto.TYPE_RSA, 2046) # open the root cert and key signing_cert = crypto.load_certificate(crypto.FILETYPE_PEM, open(cert_file).read()) k = crypto.load_privatekey(crypto.FILETYPE_PEM, open(key_file).read()) # create a cert signed by the root verification_cert = crypto.X509() verification_cert.get_subject().CN = nonce verification_cert.gmtime_adj_notBefore(0) verification_cert.gmtime_adj_notAfter(valid_days * 24 * 60 * 60) verification_cert.set_version(2) verification_cert.set_serial_number(serial_number) verification_cert.set_pubkey(public_key) verification_cert.set_issuer(signing_cert.get_subject()) verification_cert.add_extensions([ crypto.X509Extension(b"authorityKeyIdentifier", False, b"keyid:always", issuer=signing_cert) ]) verification_cert.sign(k, 'sha256') verification_cert_str = crypto.dump_certificate(crypto.FILETYPE_PEM, verification_cert).decode('ascii') open(verification_file, 'w').write(verification_cert_str)
def _generate_csr(key, key_digest, domains): csr = crypto.X509Req() csr.set_version(2) csr.set_pubkey(key) sans = ', '.join('DNS:{}'.format(d) for d in domains) exts = [crypto.X509Extension(b'subjectAltName', False, b(sans))] csr.add_extensions(exts) csr.sign(key, str(key_digest)) return ComparableX509(csr)
def generate_ca(config_ca): ca = crypto.X509() ca.set_version(2) ca.set_serial_number(config_ca['serial']) ca_subj = ca.get_subject() if 'commonName' in config_ca: ca_subj.commonName = config_ca['commonName'] if 'stateOrProvinceName' in config_ca: ca_subj.stateOrProvinceName = config_ca['stateOrProvinceName'] if 'localityName' in config_ca: ca_subj.localityName = config_ca['localityName'] if 'organizationName' in config_ca: ca_subj.organizationName = config_ca['organizationName'] if 'organizationalUnitName' in config_ca: ca_subj.organizationalUnitName = config_ca['organizationalUnitName'] if 'emailAddress' in config_ca: ca_subj.emailAddress = config_ca['emailAddress'] if 'countryName' in config_ca: ca_subj.countryName = config_ca['countryName'] if 'validfrom' in config_ca: ca.set_notBefore(config_ca['validfrom']) if 'validto' in config_ca: ca.set_notAfter(config_ca['validto']) key = openssl_generate_privatekey(config_ca['keyfilesize']) ca.add_extensions([ crypto.X509Extension("basicConstraints", True, "CA:TRUE, pathlen:1"), crypto.X509Extension("keyUsage", False, "keyCertSign, cRLSign"), crypto.X509Extension("subjectKeyIdentifier", False, "hash", subject=ca), ]) ca.add_extensions([ crypto.X509Extension("authorityKeyIdentifier", False, "keyid:always",issuer=ca) ]) ca.set_issuer(ca.get_subject()) ca.set_pubkey(key) ca.sign(key, config_ca['hashalgorithm']) return ca, key
def generate_certificate(config_cert, ca, cakey): # Generate the private key key = openssl_generate_privatekey(config_cert['keyfilesize']) # Generate the certificate request req = crypto.X509Req() req_subj = req.get_subject() if 'commonName' in config_cert: req_subj.commonName = config_cert['commonName'] if 'stateOrProvinceName' in config_cert: req_subj.stateOrProvinceName = config_cert['stateOrProvinceName'] if 'localityName' in config_cert: req_subj.localityName = config_cert['localityName'] if 'organizationName' in config_cert: req_subj.organizationName = config_cert['organizationName'] if 'organizationalUnitName' in config_cert: req_subj.organizationalUnitName = config_cert['organizationalUnitName'] if 'emailAddress' in config_cert: req_subj.emailAddress = config_cert['emailAddress'] if 'countryName' in config_cert: req_subj.countryName = config_cert['countryName'] req.set_pubkey(key) req.sign(key, config_cert['hashalgorithm']) # Now generate the certificate itself cert = crypto.X509() cert.set_version(2) cert.set_serial_number(config_cert['serial']) cert.set_subject(req.get_subject()) cert.set_pubkey(req.get_pubkey()) cert.set_issuer(ca.get_subject()) if 'validfrom' in config_cert: cert.set_notBefore(config_cert['validfrom']) if 'validto' in config_cert: cert.set_notAfter(config_cert['validto']) cert.add_extensions([ crypto.X509Extension("basicConstraints", True, "CA:FALSE"), crypto.X509Extension("keyUsage", False, "digitalSignature"), crypto.X509Extension("extendedKeyUsage", False, "codeSigning,msCTLSign,timeStamping,msCodeInd,msCodeCom"), crypto.X509Extension("subjectKeyIdentifier", False, "hash", subject=cert), crypto.X509Extension("authorityKeyIdentifier", False, "keyid:always", issuer=ca) ]) cert.sign(cakey, config_cert['hashalgorithm']) return req, cert, key
def create_subcert(certfile, commonname, ip=False, sans=None): sans = set(sans) if sans else set() cert = crypto.X509() cert.set_version(2) cert.set_serial_number(int((int(time() - sub_serial) + random.random()) * 100)) #setting the only number subject = cert.get_subject() subject.countryName = 'CN' subject.stateOrProvinceName = 'Internet' subject.localityName = 'Cernet' subject.organizationalUnitName = '%s Branch' % ca_vendor subject.commonName = commonname subject.organizationName = commonname #???????????????????? cert.gmtime_adj_notBefore(sub_time_b) cert.gmtime_adj_notAfter(sub_time_a) cert.set_issuer(ca_subject) cert.set_pubkey(sub_publickey) sans.add(commonname) if not ip: sans.add('*.' + commonname) sans = ', '.join('DNS: %s' % x for x in sans) cert.add_extensions([crypto.X509Extension(b'subjectAltName', True, sans.encode())]) cert.sign(ca_privatekey, ca_digest) with open(certfile, 'wb') as fp: fp.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
def create_certificate(self, request, issuer_cert, issuer_key, serial, days=3650, digest='sha256', extensions=[], subject_alt_names='', version=2): """ Generate a certificate given the certificate request. Arguments: request - Certificate request to sign issuer_cert - The certificate of the issuer issuer_key - The private key of the issuer extensions - x509 extensions provided as a dictionary :name, :critical, :value subject_alt_names - subject alt names e.g. IP:192.168.7.1 or DNS:my.domain serial - The serial number to assign to the certificate days - The number of days of validity (starting from now) digest - The digest method for signing (by default sha256) """ certificate = crypto.X509() # Handle x509 extensions for extension in extensions: # handle issuer and subjects that need to be self-referential (root certificate) if 'subject' in extension.keys() and extension['subject'] == 'self': extension['subject'] = certificate if 'issuer' in extension.keys() and extension['issuer'] == 'self': extension['issuer'] = certificate elif 'issuer' in extension.keys() and extension['issuer'] != 'self': extension['issuer'] = issuer_cert # have to explicitly set 'critical' extension to a bool. if 'critical' in extension.keys(): extension['critical'] = extension['critical'].lower() in ("yes", "true", "t", "1") # add the extensions to the request certificate.add_extensions([crypto.X509Extension(**extension)]) # Handle the subject alternative names (these are just X509 extensions) if len(subject_alt_names) != 0: certificate.add_extensions([crypto.X509Extension("subjectAltName", False, ", ".join(subject_alt_names))]) certificate.set_serial_number(serial) certificate.set_version(version) certificate.gmtime_adj_notBefore(0) certificate.gmtime_adj_notAfter(days*86400) certificate.set_subject(request.get_subject()) certificate.set_issuer(issuer_cert.get_subject()) certificate.set_pubkey(request.get_pubkey()) certificate.sign(issuer_key, digest) return certificate
def _create_certificate_chain(): """ Construct and return a chain of certificates. 1. A new self-signed certificate authority certificate (cacert) 2. A new intermediate certificate signed by cacert (icert) 3. A new server certificate signed by icert (scert) """ caext = X509Extension(b'basicConstraints', False, b'CA:true') # Step 1 cakey = PKey() cakey.generate_key(TYPE_RSA, 1024) cacert = X509() cacert.get_subject().commonName = "Authority Certificate" cacert.set_issuer(cacert.get_subject()) cacert.set_pubkey(cakey) cacert.set_notBefore(b"20000101000000Z") cacert.set_notAfter(b"20200101000000Z") cacert.add_extensions([caext]) cacert.set_serial_number(0) cacert.sign(cakey, "sha1") # Step 2 ikey = PKey() ikey.generate_key(TYPE_RSA, 1024) icert = X509() icert.get_subject().commonName = "Intermediate Certificate" icert.set_issuer(cacert.get_subject()) icert.set_pubkey(ikey) icert.set_notBefore(b"20000101000000Z") icert.set_notAfter(b"20200101000000Z") icert.add_extensions([caext]) icert.set_serial_number(0) icert.sign(cakey, "sha1") # Step 3 skey = PKey() skey.generate_key(TYPE_RSA, 1024) scert = X509() scert.get_subject().commonName = "Server Certificate" scert.set_issuer(icert.get_subject()) scert.set_pubkey(skey) scert.set_notBefore(b"20000101000000Z") scert.set_notAfter(b"20200101000000Z") scert.add_extensions([ X509Extension(b'basicConstraints', True, b'CA:false')]) scert.set_serial_number(0) scert.sign(ikey, "sha1") return [(cakey, cacert), (ikey, icert), (skey, scert)]
def _add_extensions(self, cert): """ (internal use only) adds x509 extensions to ``cert`` """ ext = [] # prepare extensions for CA if not hasattr(self, 'ca'): pathlen = app_settings.CA_BASIC_CONSTRAINTS_PATHLEN ext_value = 'CA:TRUE' if pathlen is not None: ext_value = '{0}, pathlen:{1}'.format(ext_value, pathlen) ext.append(crypto.X509Extension(b'basicConstraints', app_settings.CA_BASIC_CONSTRAINTS_CRITICAL, bytes_compat(ext_value))) ext.append(crypto.X509Extension(b'keyUsage', app_settings.CA_KEYUSAGE_CRITICAL, bytes_compat(app_settings.CA_KEYUSAGE_VALUE))) issuer_cert = cert # prepare extensions for end-entity certs else: ext.append(crypto.X509Extension(b'basicConstraints', False, b'CA:FALSE')) ext.append(crypto.X509Extension(b'keyUsage', app_settings.CERT_KEYUSAGE_CRITICAL, bytes_compat(app_settings.CERT_KEYUSAGE_VALUE))) issuer_cert = self.ca.x509 ext.append(crypto.X509Extension(b'subjectKeyIdentifier', False, b'hash', subject=cert)) cert.add_extensions(ext) # authorityKeyIdentifier must be added after # the other extensions have been already added cert.add_extensions([ crypto.X509Extension(b'authorityKeyIdentifier', False, b'keyid:always,issuer:always', issuer=issuer_cert) ]) for ext in self.extensions: cert.add_extensions([ crypto.X509Extension(bytes_compat(ext['name']), bool(ext['critical']), bytes_compat(ext['value'])) ]) return cert
def createCertificate(req, issuerCertKey, serial, validityPeriod, digest="sha256", isCA=False, extensions=[]): """ Generate a certificate given a certificate request. Arguments: req - Certificate request to use issuerCert - The certificate of the issuer issuerKey - The private key of the issuer serial - Serial number for the certificate notBefore - Timestamp (relative to now) when the certificate starts being valid notAfter - Timestamp (relative to now) when the certificate stops being valid digest - Digest method to use for signing, default is sha256 Returns: The signed certificate in an X509 object """ issuerCert, issuerKey = issuerCertKey notBefore, notAfter = validityPeriod cert = crypto.X509() cert.set_version(3) cert.set_serial_number(serial) cert.gmtime_adj_notBefore(notBefore) cert.gmtime_adj_notAfter(notAfter) cert.set_issuer(issuerCert.get_subject()) cert.set_subject(req.get_subject()) cert.set_pubkey(req.get_pubkey()) if isCA: cert.add_extensions([crypto.X509Extension("basicConstraints", True, "CA:TRUE, pathlen:0"), crypto.X509Extension("subjectKeyIdentifier", False, "hash", subject=cert)]) #TODO: This only is appropriate for root self signed!!!! cert.add_extensions([crypto.X509Extension("authorityKeyIdentifier", False, "keyid:always", issuer=cert)]) else: cert.add_extensions([crypto.X509Extension("basicConstraints", True, "CA:FALSE"), crypto.X509Extension("subjectKeyIdentifier", False, "hash", subject=cert)]) cert.add_extensions([crypto.X509Extension("authorityKeyIdentifier", False, "keyid:always", issuer=issuerCert)]) cert.add_extensions(extensions) cert.sign(issuerKey, digest) return cert # SUBJECT_DEFAULT = {countryName : "US", stateOrProvinceName : "NC", localityName : "RTP", organizationName : "IBM", organizationalUnitName : "Blockchain"}
def _create_certificate_chain(): """ Construct and return a chain of certificates. 1. A new self-signed certificate authority certificate (cacert) 2. A new intermediate certificate signed by cacert (icert) 3. A new server certificate signed by icert (scert) """ caext = X509Extension(b('basicConstraints'), False, b('CA:true')) # Step 1 cakey = PKey() cakey.generate_key(TYPE_RSA, 512) cacert = X509() cacert.get_subject().commonName = "Authority Certificate" cacert.set_issuer(cacert.get_subject()) cacert.set_pubkey(cakey) cacert.set_notBefore(b("20000101000000Z")) cacert.set_notAfter(b("20200101000000Z")) cacert.add_extensions([caext]) cacert.set_serial_number(0) cacert.sign(cakey, "sha1") # Step 2 ikey = PKey() ikey.generate_key(TYPE_RSA, 512) icert = X509() icert.get_subject().commonName = "Intermediate Certificate" icert.set_issuer(cacert.get_subject()) icert.set_pubkey(ikey) icert.set_notBefore(b("20000101000000Z")) icert.set_notAfter(b("20200101000000Z")) icert.add_extensions([caext]) icert.set_serial_number(0) icert.sign(cakey, "sha1") # Step 3 skey = PKey() skey.generate_key(TYPE_RSA, 512) scert = X509() scert.get_subject().commonName = "Server Certificate" scert.set_issuer(icert.get_subject()) scert.set_pubkey(skey) scert.set_notBefore(b("20000101000000Z")) scert.set_notAfter(b("20200101000000Z")) scert.add_extensions([ X509Extension(b('basicConstraints'), True, b('CA:false'))]) scert.set_serial_number(0) scert.sign(ikey, "sha1") return [(cakey, cacert), (ikey, icert), (skey, scert)]