Python OpenSSL.crypto 模块,X509Req() 实例源码

我们从Python开源项目中,提取了以下22个代码示例,用于说明如何使用OpenSSL.crypto.X509Req()

项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def generateCertificateObjects(organization, organizationalUnit):
    pkey = crypto.PKey()
    pkey.generate_key(crypto.TYPE_RSA, 512)
    req = crypto.X509Req()
    subject = req.get_subject()
    subject.O = organization
    subject.OU = organizationalUnit
    req.set_pubkey(pkey)
    req.sign(pkey, "md5")

    # Here comes the actual certificate
    cert = crypto.X509()
    cert.set_serial_number(1)
    cert.gmtime_adj_notBefore(0)
    cert.gmtime_adj_notAfter(60) # Testing certificates need not be long lived
    cert.set_issuer(req.get_subject())
    cert.set_subject(req.get_subject())
    cert.set_pubkey(req.get_pubkey())
    cert.sign(pkey, "md5")

    return pkey, req, cert
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def otherMakeCertificate(**kw):
    keypair = PKey()
    keypair.generate_key(TYPE_RSA, 1024)

    req = X509Req()
    subj = req.get_subject()
    for (k, v) in kw.items():
        setattr(subj, k, v)

    req.set_pubkey(keypair)
    req.sign(keypair, "md5")

    cert = X509()
    cert.set_serial_number(counter())
    cert.gmtime_adj_notBefore(0)
    cert.gmtime_adj_notAfter(60 * 60 * 24 * 365) # One year

    cert.set_issuer(req.get_subject())
    cert.set_subject(req.get_subject())
    cert.set_pubkey(req.get_pubkey())
    cert.sign(keypair, "md5")

    return keypair, cert
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def otherMakeCertificate(**kw):
    keypair = PKey()
    keypair.generate_key(TYPE_RSA, 1024)

    req = X509Req()
    subj = req.get_subject()
    for (k, v) in kw.items():
        setattr(subj, k, v)

    req.set_pubkey(keypair)
    req.sign(keypair, "md5")

    cert = X509()
    cert.set_serial_number(counter())
    cert.gmtime_adj_notBefore(0)
    cert.gmtime_adj_notAfter(60 * 60 * 24 * 365) # One year

    cert.set_issuer(req.get_subject())
    cert.set_subject(req.get_subject())
    cert.set_pubkey(req.get_pubkey())
    cert.sign(keypair, "md5")

    return keypair, cert
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def __init__(self, osslpkey):
        self.original = osslpkey
        req1 = crypto.X509Req()
        req1.set_pubkey(osslpkey)
        self._emptyReq = crypto.dump_certificate_request(crypto.FILETYPE_ASN1, req1)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def requestObject(self, distinguishedName, digestAlgorithm='md5'):
        req = crypto.X509Req()
        req.set_pubkey(self.original)
        distinguishedName._copyInto(req.get_subject())
        req.sign(self.original, digestAlgorithm)
        return CertificateRequest(req)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def generateCertificateObjects(organization, organizationalUnit):
    pkey = crypto.PKey()
    pkey.generate_key(crypto.TYPE_RSA, 512)
    req = crypto.X509Req()
    subject = req.get_subject()
    subject.O = organization
    subject.OU = organizationalUnit
    req.set_pubkey(pkey)
    req.sign(pkey, "md5")

    # Here comes the actual certificate
    cert = crypto.X509()
    cert.set_serial_number(1)
    cert.gmtime_adj_notBefore(0)
    cert.gmtime_adj_notAfter(60) # Testing certificates need not be long lived
    cert.set_issuer(req.get_subject())
    cert.set_subject(req.get_subject())
    cert.set_pubkey(req.get_pubkey())
    cert.sign(pkey, "md5")

    return pkey, req, cert
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def create_certificate_request(self, pkey, digest='sha256', **name):
        """
        Create a certificate request.
        Arguments: key        - The key to associate with the request
                   digest     - Digestion method to use for signing, default is sha256
                   **name     - The name of the subject of the request, possible
                                arguments are:
                                 C     - Country name
                                 ST    - State or province name
                                 L     - Locality name
                                 O     - Organization name
                                 OU    - Organizational unit name
                                 CN    - Common name
                                 emailAddress - E-mail address
        Returns:   The certificate request in an X509Req object
        """

        request = crypto.X509Req()
        subject = request.get_subject()

        # Handle creating the subject of the request
        for(key,value) in name.items():
            setattr(subject, key, value)

        request.set_pubkey(pkey)

        # Sign the request
        request.sign(pkey, digest)
        return request
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def create_certificate_request(self, pkey, digest='sha256', **name):
        """
        Create a certificate request.
        Arguments: key        - The key to associate with the request
                   digest     - Digestion method to use for signing, default is sha256
                   **name     - The name of the subject of the request, possible
                                arguments are:
                                 C     - Country name
                                 ST    - State or province name
                                 L     - Locality name
                                 O     - Organization name
                                 OU    - Organizational unit name
                                 CN    - Common name
                                 emailAddress - E-mail address
        Returns:   The certificate request in an X509Req object
        """

        request = crypto.X509Req()
        subject = request.get_subject()

        # Handle creating the subject of the request
        for(key,value) in name.items():
            setattr(subject, key, value)

        request.set_pubkey(pkey)

        # Sign the request
        request.sign(pkey, digest)
        return request
项目:utils    作者:rapydo    | 项目源码 | 文件源码
def generate_csr_and_key(user='TestUser'):
        """
        TestUser is the user proposed by the documentation,
        which will be ignored
        """
        key = crypto.PKey()
        key.generate_key(crypto.TYPE_RSA, 1024)
        req = crypto.X509Req()
        req.get_subject().CN = user
        req.set_pubkey(key)
        req.sign(key, "sha1")
        # print("CSR", key, req)
        return key, req
项目:docket    作者:rocknsm    | 项目源码 | 文件源码
def generate(self, module):
        '''Generate the certificate signing request.'''

        if not self.check(module, perms_required=False) or self.force:
            req = crypto.X509Req()
            req.set_version(self.version)
            subject = req.get_subject()
            for (key, value) in self.subject.items():
                if value is not None:
                    setattr(subject, key, value)

            altnames = ', '.join(self.subjectAltName)
            extensions = [crypto.X509Extension(b"subjectAltName", False, altnames.encode('ascii'))]

            if self.keyUsage:
                usages = ', '.join(self.keyUsage)
                extensions.append(crypto.X509Extension(b"keyUsage", False, usages.encode('ascii')))

            if self.extendedKeyUsage:
                usages = ', '.join(self.extendedKeyUsage)
                extensions.append(crypto.X509Extension(b"extendedKeyUsage", False, usages.encode('ascii')))

            req.add_extensions(extensions)

            req.set_pubkey(self.privatekey)
            req.sign(self.privatekey, self.digest)
            self.request = req

            try:
                csr_file = open(self.path, 'wb')
                csr_file.write(crypto.dump_certificate_request(crypto.FILETYPE_PEM, self.request))
                csr_file.close()
            except (IOError, OSError) as exc:
                raise CertificateSigningRequestError(exc)

            self.changed = True

        file_args = module.load_file_common_arguments(module.params)
        if module.set_fs_attributes_if_different(file_args, False):
            self.changed = True
项目:2FAssassin    作者:maxwellkoh    | 项目源码 | 文件源码
def createCertRequest(pkey, digest="sha256", **name):
    """
    Create a certificate request.

    Arguments: pkey   - The key to associate with the request
               digest - Digestion method to use for signing, default is sha256
               **name - The name of the subject of the request, possible
                        arguments are:
                          C     - Country name
                          ST    - State or province name
                          L     - Locality name
                          O     - Organization name
                          OU    - Organizational unit name
                          CN    - Common name
                          emailAddress - E-mail address
    Returns:   The certificate request in an X509Req object
    """
    req = crypto.X509Req()
    subj = req.get_subject()

    for key, value in name.items():
        setattr(subj, key, value)

    req.set_pubkey(pkey)
    req.sign(pkey, digest)
    return req
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def __init__(self, osslpkey):
        self.original = osslpkey
        req1 = crypto.X509Req()
        req1.set_pubkey(osslpkey)
        self._emptyReq = crypto.dump_certificate_request(crypto.FILETYPE_ASN1, req1)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def requestObject(self, distinguishedName, digestAlgorithm='md5'):
        req = crypto.X509Req()
        req.set_pubkey(self.original)
        distinguishedName._copyInto(req.get_subject())
        req.sign(self.original, digestAlgorithm)
        return CertificateRequest(req)
项目:fabric-test    作者:hyperledger    | 项目源码 | 文件源码
def createCertRequest(pkey, extensions=[], digest="sha256", **name):
    """
    Create a certificate request.
    Arguments: pkey   - The key to associate with the request
               digest - Digestion method to use for signing, default is sha256
               **name - The name of the subject of the request, possible
                        arguments are:
                          C     - Country name
                          ST    - State or province name
                          L     - Locality name
                          O     - Organization name
                          OU    - Organizational unit name
                          CN    - Common name
                          emailAddress - E-mail address
    Returns:   The certificate request in an X509Req object
    """
    req = crypto.X509Req()
    subj = req.get_subject()

    for key, value in name.items():
        setattr(subj, key, value)

    req.add_extensions(extensions)

    req.set_pubkey(pkey)
    req.sign(pkey, digest)
    return req
项目:ansible-nginx-load-balancer    作者:mrlesmithjr    | 项目源码 | 文件源码
def generate(self, module):
        '''Generate the certificate signing request.'''

        if not os.path.exists(self.path) or self.force:
            req = crypto.X509Req()
            req.set_version(self.version)
            subject = req.get_subject()
            for (key, value) in self.subject.items():
                if value is not None:
                    setattr(subject, key, value)

            if self.subjectAltName is not None:
                req.add_extensions([crypto.X509Extension(b"subjectAltName", False, self.subjectAltName.encode('ascii'))])

            privatekey_content = open(self.privatekey_path).read()
            self.privatekey = crypto.load_privatekey(crypto.FILETYPE_PEM, privatekey_content)

            req.set_pubkey(self.privatekey)
            req.sign(self.privatekey, self.digest)
            self.request = req

            try:
                csr_file = open(self.path, 'wb')
                csr_file.write(crypto.dump_certificate_request(crypto.FILETYPE_PEM, self.request))
                csr_file.close()
            except (IOError, OSError) as exc:
                raise CertificateSigningRequestError(exc)
        else:
            self.changed = False

        file_args = module.load_file_common_arguments(module.params)
        if module.set_fs_attributes_if_different(file_args, False):
            self.changed = True
项目:wile    作者:costela    | 项目源码 | 文件源码
def _generate_csr(key, key_digest, domains):
    csr = crypto.X509Req()
    csr.set_version(2)
    csr.set_pubkey(key)

    sans = ', '.join('DNS:{}'.format(d) for d in domains)
    exts = [crypto.X509Extension(b'subjectAltName', False, b(sans))]
    csr.add_extensions(exts)

    csr.sign(key, str(key_digest))

    return ComparableX509(csr)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def requestObject(self, distinguishedName, digestAlgorithm='sha256'):
        req = crypto.X509Req()
        req.set_pubkey(self.original)
        distinguishedName._copyInto(req.get_subject())
        req.sign(self.original, digestAlgorithm)
        return CertificateRequest(req)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def generateCertificateObjects(organization, organizationalUnit):
    """
    Create a certificate for given C{organization} and C{organizationalUnit}.

    @return: a tuple of (key, request, certificate) objects.
    """
    pkey = crypto.PKey()
    pkey.generate_key(crypto.TYPE_RSA, 512)
    req = crypto.X509Req()
    subject = req.get_subject()
    subject.O = organization
    subject.OU = organizationalUnit
    req.set_pubkey(pkey)
    req.sign(pkey, "md5")

    # Here comes the actual certificate
    cert = crypto.X509()
    cert.set_serial_number(1)
    cert.gmtime_adj_notBefore(0)
    cert.gmtime_adj_notAfter(60) # Testing certificates need not be long lived
    cert.set_issuer(req.get_subject())
    cert.set_subject(req.get_subject())
    cert.set_pubkey(req.get_pubkey())
    cert.sign(pkey, "md5")

    return pkey, req, cert
项目:certerator    作者:stufus    | 项目源码 | 文件源码
def generate_certificate(config_cert, ca, cakey):
    # Generate the private key
    key = openssl_generate_privatekey(config_cert['keyfilesize'])

    # Generate the certificate request
    req = crypto.X509Req()
    req_subj = req.get_subject()
    if 'commonName' in config_cert:
        req_subj.commonName = config_cert['commonName']
    if 'stateOrProvinceName' in config_cert:
        req_subj.stateOrProvinceName = config_cert['stateOrProvinceName']
    if 'localityName' in config_cert:
        req_subj.localityName = config_cert['localityName']
    if 'organizationName' in config_cert:
        req_subj.organizationName = config_cert['organizationName']
    if 'organizationalUnitName' in config_cert:
        req_subj.organizationalUnitName = config_cert['organizationalUnitName']
    if 'emailAddress' in config_cert:
        req_subj.emailAddress = config_cert['emailAddress']
    if 'countryName' in config_cert:
        req_subj.countryName = config_cert['countryName']

    req.set_pubkey(key)
    req.sign(key, config_cert['hashalgorithm'])

    # Now generate the certificate itself
    cert = crypto.X509()
    cert.set_version(2)
    cert.set_serial_number(config_cert['serial'])
    cert.set_subject(req.get_subject())
    cert.set_pubkey(req.get_pubkey())
    cert.set_issuer(ca.get_subject())

    if 'validfrom' in config_cert:
        cert.set_notBefore(config_cert['validfrom'])
    if 'validto' in config_cert:
        cert.set_notAfter(config_cert['validto'])

    cert.add_extensions([
        crypto.X509Extension("basicConstraints", True, "CA:FALSE"),
        crypto.X509Extension("keyUsage", False, "digitalSignature"),
        crypto.X509Extension("extendedKeyUsage", False, "codeSigning,msCTLSign,timeStamping,msCodeInd,msCodeCom"),
        crypto.X509Extension("subjectKeyIdentifier", False, "hash", subject=cert),
        crypto.X509Extension("authorityKeyIdentifier", False, "keyid:always", issuer=ca)
    ])

    cert.sign(cakey, config_cert['hashalgorithm'])
    return req, cert, key
项目:letslambda    作者:kiddouk    | 项目源码 | 文件源码
def generate_certificate_signing_request(conf, domain):
    key = load_private_key(conf, domain)

    LOG.info("Creating Certificate Signing Request.")
    csr = crypto.X509Req()
    csr.get_subject().countryName = domain['countryName']
    csr.get_subject().CN = domain['name']
    csr.set_pubkey(key)
    csr.sign(key, "sha1")
    return (csr, key)
项目:lecm    作者:Spredzy    | 项目源码 | 文件源码
def _create_csr(self):
        LOG.info('[%s] Generating CSR' % self.name)
        req = crypto.X509Req()
        LOG.debug('[%s] Attaching Certificate Version to CSR: %s' %
                  (self.name, self.version))
        req.set_version(self.version)
        subject = req.get_subject()

        for (key, value) in self.subject.items():
            if value is not None:
                LOG.debug('[%s] Attaching %s to CSR: %s' %
                          (self.name, key, value))
                setattr(subject, key, value)

        LOG.info('[%s] Attaching SAN extention: %s' %
                 (self.name, self.subjectAltName))

        try:
            req.add_extensions([crypto.X509Extension(
                bytes('subjectAltName', 'utf-8'), False,
                bytes(self.subjectAltName, 'utf-8')
            )])
        except TypeError:
            req.add_extensions([crypto.X509Extension('subjectAltName', False,
                                                     self.subjectAltName)])

        LOG.debug('[%s] Loading private key: %s/private/%s.key' %
                  (self.name, self.path, self.name))
        privatekey_content = open('%s/private/%s.key' %
                                  (self.path, self.name)).read()

        privatekey = crypto.load_privatekey(crypto.FILETYPE_PEM,
                                            privatekey_content)

        LOG.info('[%s] Signing CSR' % self.name)
        req.set_pubkey(privatekey)
        req.sign(privatekey, self.digest)

        LOG.debug('[%s] Writting CSR: %s/csr/%s.csr' %
                  (self.name, self.path, self.name))
        csr_file = open('%s/csr/%s.csr' % (self.path, self.name), 'w')
        csr_file.write((crypto.dump_certificate_request(crypto.FILETYPE_PEM,
                                                        req)).decode('utf-8'))
        csr_file.close()
项目:certproxy    作者:geneanet    | 项目源码 | 文件源码
def _request_new_cert(self, domain, keyfile, altname=None):
        """ Generate a new certificate for a domain using an ACME authority """
        self._init_client()
        # Specific logger
        logger = logging.getLogger('certproxy.acmeproxy.acme')

        # Load the private key
        key = crypto.load_privatekey(crypto.FILETYPE_PEM, readfile(keyfile, binary=True))

        # Create a CSR
        req = crypto.X509Req()
        req.get_subject().CN = domain
        req.add_extensions([
            crypto.X509Extension("keyUsage".encode(), False, "Digital Signature, Non Repudiation, Key Encipherment".encode()),
            crypto.X509Extension("basicConstraints".encode(), False, "CA:FALSE".encode()),
        ])
        if altname:
            req.add_extensions([
                crypto.X509Extension("subjectAltName".encode(), False, ', '.join(["DNS:{}".format(domain) for domain in altname]).encode())
            ])
        req.set_pubkey(key)
        req.sign(key, "sha256")

        # Validate every domain
        auths = []
        for dom in set(altname + [domain]):
            logger.debug("Requesting challenges for domain %s.", dom)
            auth = self.client.request_domain_challenges(dom)

            if auth.body.status != acme.messages.STATUS_VALID:
                logger.debug("Domain %s not yet authorized. Processing authorization.", dom)
                self._process_auth(auth)
            else:
                logger.debug("Domain %s already authorized, valid till %s.", dom, auth.body.expires)

            auths.append(auth)

        # Request certificate and chain
        logger.debug("Requesting certificate issuance for domain %s (altname: %s).", domain, ','.join(altname))
        (crt, auths) = self.client.poll_and_request_issuance(acme.jose.util.ComparableX509(req), auths)
        logger.debug("Requesting certificate chain for domain %s.", domain)
        chain = self.client.fetch_chain(crt)

        # Dump to PEM
        crt_pem = crypto.dump_certificate(crypto.FILETYPE_PEM, crt.body)
        chain_pem = '\n'.join([crypto.dump_certificate(crypto.FILETYPE_PEM, link).decode() for link in chain]).encode()

        return (crt_pem, chain_pem)