我们从Python开源项目中,提取了以下22个代码示例,用于说明如何使用OpenSSL.crypto.X509Req()。
def generateCertificateObjects(organization, organizationalUnit): pkey = crypto.PKey() pkey.generate_key(crypto.TYPE_RSA, 512) req = crypto.X509Req() subject = req.get_subject() subject.O = organization subject.OU = organizationalUnit req.set_pubkey(pkey) req.sign(pkey, "md5") # Here comes the actual certificate cert = crypto.X509() cert.set_serial_number(1) cert.gmtime_adj_notBefore(0) cert.gmtime_adj_notAfter(60) # Testing certificates need not be long lived cert.set_issuer(req.get_subject()) cert.set_subject(req.get_subject()) cert.set_pubkey(req.get_pubkey()) cert.sign(pkey, "md5") return pkey, req, cert
def otherMakeCertificate(**kw): keypair = PKey() keypair.generate_key(TYPE_RSA, 1024) req = X509Req() subj = req.get_subject() for (k, v) in kw.items(): setattr(subj, k, v) req.set_pubkey(keypair) req.sign(keypair, "md5") cert = X509() cert.set_serial_number(counter()) cert.gmtime_adj_notBefore(0) cert.gmtime_adj_notAfter(60 * 60 * 24 * 365) # One year cert.set_issuer(req.get_subject()) cert.set_subject(req.get_subject()) cert.set_pubkey(req.get_pubkey()) cert.sign(keypair, "md5") return keypair, cert
def __init__(self, osslpkey): self.original = osslpkey req1 = crypto.X509Req() req1.set_pubkey(osslpkey) self._emptyReq = crypto.dump_certificate_request(crypto.FILETYPE_ASN1, req1)
def requestObject(self, distinguishedName, digestAlgorithm='md5'): req = crypto.X509Req() req.set_pubkey(self.original) distinguishedName._copyInto(req.get_subject()) req.sign(self.original, digestAlgorithm) return CertificateRequest(req)
def create_certificate_request(self, pkey, digest='sha256', **name): """ Create a certificate request. Arguments: key - The key to associate with the request digest - Digestion method to use for signing, default is sha256 **name - The name of the subject of the request, possible arguments are: C - Country name ST - State or province name L - Locality name O - Organization name OU - Organizational unit name CN - Common name emailAddress - E-mail address Returns: The certificate request in an X509Req object """ request = crypto.X509Req() subject = request.get_subject() # Handle creating the subject of the request for(key,value) in name.items(): setattr(subject, key, value) request.set_pubkey(pkey) # Sign the request request.sign(pkey, digest) return request
def generate_csr_and_key(user='TestUser'): """ TestUser is the user proposed by the documentation, which will be ignored """ key = crypto.PKey() key.generate_key(crypto.TYPE_RSA, 1024) req = crypto.X509Req() req.get_subject().CN = user req.set_pubkey(key) req.sign(key, "sha1") # print("CSR", key, req) return key, req
def generate(self, module): '''Generate the certificate signing request.''' if not self.check(module, perms_required=False) or self.force: req = crypto.X509Req() req.set_version(self.version) subject = req.get_subject() for (key, value) in self.subject.items(): if value is not None: setattr(subject, key, value) altnames = ', '.join(self.subjectAltName) extensions = [crypto.X509Extension(b"subjectAltName", False, altnames.encode('ascii'))] if self.keyUsage: usages = ', '.join(self.keyUsage) extensions.append(crypto.X509Extension(b"keyUsage", False, usages.encode('ascii'))) if self.extendedKeyUsage: usages = ', '.join(self.extendedKeyUsage) extensions.append(crypto.X509Extension(b"extendedKeyUsage", False, usages.encode('ascii'))) req.add_extensions(extensions) req.set_pubkey(self.privatekey) req.sign(self.privatekey, self.digest) self.request = req try: csr_file = open(self.path, 'wb') csr_file.write(crypto.dump_certificate_request(crypto.FILETYPE_PEM, self.request)) csr_file.close() except (IOError, OSError) as exc: raise CertificateSigningRequestError(exc) self.changed = True file_args = module.load_file_common_arguments(module.params) if module.set_fs_attributes_if_different(file_args, False): self.changed = True
def createCertRequest(pkey, digest="sha256", **name): """ Create a certificate request. Arguments: pkey - The key to associate with the request digest - Digestion method to use for signing, default is sha256 **name - The name of the subject of the request, possible arguments are: C - Country name ST - State or province name L - Locality name O - Organization name OU - Organizational unit name CN - Common name emailAddress - E-mail address Returns: The certificate request in an X509Req object """ req = crypto.X509Req() subj = req.get_subject() for key, value in name.items(): setattr(subj, key, value) req.set_pubkey(pkey) req.sign(pkey, digest) return req
def createCertRequest(pkey, extensions=[], digest="sha256", **name): """ Create a certificate request. Arguments: pkey - The key to associate with the request digest - Digestion method to use for signing, default is sha256 **name - The name of the subject of the request, possible arguments are: C - Country name ST - State or province name L - Locality name O - Organization name OU - Organizational unit name CN - Common name emailAddress - E-mail address Returns: The certificate request in an X509Req object """ req = crypto.X509Req() subj = req.get_subject() for key, value in name.items(): setattr(subj, key, value) req.add_extensions(extensions) req.set_pubkey(pkey) req.sign(pkey, digest) return req
def generate(self, module): '''Generate the certificate signing request.''' if not os.path.exists(self.path) or self.force: req = crypto.X509Req() req.set_version(self.version) subject = req.get_subject() for (key, value) in self.subject.items(): if value is not None: setattr(subject, key, value) if self.subjectAltName is not None: req.add_extensions([crypto.X509Extension(b"subjectAltName", False, self.subjectAltName.encode('ascii'))]) privatekey_content = open(self.privatekey_path).read() self.privatekey = crypto.load_privatekey(crypto.FILETYPE_PEM, privatekey_content) req.set_pubkey(self.privatekey) req.sign(self.privatekey, self.digest) self.request = req try: csr_file = open(self.path, 'wb') csr_file.write(crypto.dump_certificate_request(crypto.FILETYPE_PEM, self.request)) csr_file.close() except (IOError, OSError) as exc: raise CertificateSigningRequestError(exc) else: self.changed = False file_args = module.load_file_common_arguments(module.params) if module.set_fs_attributes_if_different(file_args, False): self.changed = True
def _generate_csr(key, key_digest, domains): csr = crypto.X509Req() csr.set_version(2) csr.set_pubkey(key) sans = ', '.join('DNS:{}'.format(d) for d in domains) exts = [crypto.X509Extension(b'subjectAltName', False, b(sans))] csr.add_extensions(exts) csr.sign(key, str(key_digest)) return ComparableX509(csr)
def requestObject(self, distinguishedName, digestAlgorithm='sha256'): req = crypto.X509Req() req.set_pubkey(self.original) distinguishedName._copyInto(req.get_subject()) req.sign(self.original, digestAlgorithm) return CertificateRequest(req)
def generateCertificateObjects(organization, organizationalUnit): """ Create a certificate for given C{organization} and C{organizationalUnit}. @return: a tuple of (key, request, certificate) objects. """ pkey = crypto.PKey() pkey.generate_key(crypto.TYPE_RSA, 512) req = crypto.X509Req() subject = req.get_subject() subject.O = organization subject.OU = organizationalUnit req.set_pubkey(pkey) req.sign(pkey, "md5") # Here comes the actual certificate cert = crypto.X509() cert.set_serial_number(1) cert.gmtime_adj_notBefore(0) cert.gmtime_adj_notAfter(60) # Testing certificates need not be long lived cert.set_issuer(req.get_subject()) cert.set_subject(req.get_subject()) cert.set_pubkey(req.get_pubkey()) cert.sign(pkey, "md5") return pkey, req, cert
def generate_certificate(config_cert, ca, cakey): # Generate the private key key = openssl_generate_privatekey(config_cert['keyfilesize']) # Generate the certificate request req = crypto.X509Req() req_subj = req.get_subject() if 'commonName' in config_cert: req_subj.commonName = config_cert['commonName'] if 'stateOrProvinceName' in config_cert: req_subj.stateOrProvinceName = config_cert['stateOrProvinceName'] if 'localityName' in config_cert: req_subj.localityName = config_cert['localityName'] if 'organizationName' in config_cert: req_subj.organizationName = config_cert['organizationName'] if 'organizationalUnitName' in config_cert: req_subj.organizationalUnitName = config_cert['organizationalUnitName'] if 'emailAddress' in config_cert: req_subj.emailAddress = config_cert['emailAddress'] if 'countryName' in config_cert: req_subj.countryName = config_cert['countryName'] req.set_pubkey(key) req.sign(key, config_cert['hashalgorithm']) # Now generate the certificate itself cert = crypto.X509() cert.set_version(2) cert.set_serial_number(config_cert['serial']) cert.set_subject(req.get_subject()) cert.set_pubkey(req.get_pubkey()) cert.set_issuer(ca.get_subject()) if 'validfrom' in config_cert: cert.set_notBefore(config_cert['validfrom']) if 'validto' in config_cert: cert.set_notAfter(config_cert['validto']) cert.add_extensions([ crypto.X509Extension("basicConstraints", True, "CA:FALSE"), crypto.X509Extension("keyUsage", False, "digitalSignature"), crypto.X509Extension("extendedKeyUsage", False, "codeSigning,msCTLSign,timeStamping,msCodeInd,msCodeCom"), crypto.X509Extension("subjectKeyIdentifier", False, "hash", subject=cert), crypto.X509Extension("authorityKeyIdentifier", False, "keyid:always", issuer=ca) ]) cert.sign(cakey, config_cert['hashalgorithm']) return req, cert, key
def generate_certificate_signing_request(conf, domain): key = load_private_key(conf, domain) LOG.info("Creating Certificate Signing Request.") csr = crypto.X509Req() csr.get_subject().countryName = domain['countryName'] csr.get_subject().CN = domain['name'] csr.set_pubkey(key) csr.sign(key, "sha1") return (csr, key)
def _create_csr(self): LOG.info('[%s] Generating CSR' % self.name) req = crypto.X509Req() LOG.debug('[%s] Attaching Certificate Version to CSR: %s' % (self.name, self.version)) req.set_version(self.version) subject = req.get_subject() for (key, value) in self.subject.items(): if value is not None: LOG.debug('[%s] Attaching %s to CSR: %s' % (self.name, key, value)) setattr(subject, key, value) LOG.info('[%s] Attaching SAN extention: %s' % (self.name, self.subjectAltName)) try: req.add_extensions([crypto.X509Extension( bytes('subjectAltName', 'utf-8'), False, bytes(self.subjectAltName, 'utf-8') )]) except TypeError: req.add_extensions([crypto.X509Extension('subjectAltName', False, self.subjectAltName)]) LOG.debug('[%s] Loading private key: %s/private/%s.key' % (self.name, self.path, self.name)) privatekey_content = open('%s/private/%s.key' % (self.path, self.name)).read() privatekey = crypto.load_privatekey(crypto.FILETYPE_PEM, privatekey_content) LOG.info('[%s] Signing CSR' % self.name) req.set_pubkey(privatekey) req.sign(privatekey, self.digest) LOG.debug('[%s] Writting CSR: %s/csr/%s.csr' % (self.name, self.path, self.name)) csr_file = open('%s/csr/%s.csr' % (self.path, self.name), 'w') csr_file.write((crypto.dump_certificate_request(crypto.FILETYPE_PEM, req)).decode('utf-8')) csr_file.close()
def _request_new_cert(self, domain, keyfile, altname=None): """ Generate a new certificate for a domain using an ACME authority """ self._init_client() # Specific logger logger = logging.getLogger('certproxy.acmeproxy.acme') # Load the private key key = crypto.load_privatekey(crypto.FILETYPE_PEM, readfile(keyfile, binary=True)) # Create a CSR req = crypto.X509Req() req.get_subject().CN = domain req.add_extensions([ crypto.X509Extension("keyUsage".encode(), False, "Digital Signature, Non Repudiation, Key Encipherment".encode()), crypto.X509Extension("basicConstraints".encode(), False, "CA:FALSE".encode()), ]) if altname: req.add_extensions([ crypto.X509Extension("subjectAltName".encode(), False, ', '.join(["DNS:{}".format(domain) for domain in altname]).encode()) ]) req.set_pubkey(key) req.sign(key, "sha256") # Validate every domain auths = [] for dom in set(altname + [domain]): logger.debug("Requesting challenges for domain %s.", dom) auth = self.client.request_domain_challenges(dom) if auth.body.status != acme.messages.STATUS_VALID: logger.debug("Domain %s not yet authorized. Processing authorization.", dom) self._process_auth(auth) else: logger.debug("Domain %s already authorized, valid till %s.", dom, auth.body.expires) auths.append(auth) # Request certificate and chain logger.debug("Requesting certificate issuance for domain %s (altname: %s).", domain, ','.join(altname)) (crt, auths) = self.client.poll_and_request_issuance(acme.jose.util.ComparableX509(req), auths) logger.debug("Requesting certificate chain for domain %s.", domain) chain = self.client.fetch_chain(crt) # Dump to PEM crt_pem = crypto.dump_certificate(crypto.FILETYPE_PEM, crt.body) chain_pem = '\n'.join([crypto.dump_certificate(crypto.FILETYPE_PEM, link).decode() for link in chain]).encode() return (crt_pem, chain_pem)