Python OpenSSL.SSL 模块,VERIFY_PEER 实例源码

我们从Python开源项目中,提取了以下36个代码示例,用于说明如何使用OpenSSL.SSL.VERIFY_PEER

项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test03_ssl_verification_of_peer_fails(self):
        ctx = SSL.Context(SSL.TLSv1_METHOD)

        def verify_callback(conn, x509, errnum, errdepth, preverify_ok): 
            log.debug('SSL peer certificate verification failed for %r',
                      x509.get_subject())
            return preverify_ok 

        ctx.set_verify(SSL.VERIFY_PEER, verify_callback)
        ctx.set_verify_depth(9)

        # Set bad location - unit test dir has no CA certs to verify with
        ctx.load_verify_locations(None, Constants.UNITTEST_DIR)

        conn = HTTPSConnection(Constants.HOSTNAME, port=Constants.PORT,
                               ssl_context=ctx)
        conn.connect()        
        self.assertRaises(SSL.Error, conn.request, 'GET', '/')
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test03_ssl_verification_of_peer_succeeds(self):
        ctx = SSL.Context(SSL.TLSv1_METHOD)

        verify_callback = lambda conn, x509, errnum, errdepth, preverify_ok: \
            preverify_ok 

        ctx.set_verify(SSL.VERIFY_PEER, verify_callback)
        ctx.set_verify_depth(9)

        # Set correct location for CA certs to verify with
        ctx.load_verify_locations(None, Constants.CACERT_DIR)

        conn = HTTPSConnection(Constants.HOSTNAME, port=Constants.PORT,
                               ssl_context=ctx)
        conn.connect()
        conn.request('GET', '/')
        resp = conn.getresponse()
        print('Response = %s' % resp.read())
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test04_ssl_verification_with_subj_alt_name(self):
        ctx = SSL.Context(SSL.TLSv1_METHOD)

        verification = ServerSSLCertVerification(hostname='localhost')
        verify_callback = verification.get_verify_server_cert_func()

        ctx.set_verify(SSL.VERIFY_PEER, verify_callback)
        ctx.set_verify_depth(9)

        # Set correct location for CA certs to verify with
        ctx.load_verify_locations(None, Constants.CACERT_DIR)

        conn = HTTPSConnection(Constants.HOSTNAME, port=Constants.PORT,
                               ssl_context=ctx)
        conn.connect()
        conn.request('GET', '/')
        resp = conn.getresponse()
        print('Response = %s' % resp.read())
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _server(self, sock):
        """
        Create a new server-side SSL L{Connection} object wrapped around
        C{sock}.
        """
        # Create the server side Connection.  This is mostly setup boilerplate
        # - use TLSv1, use a particular certificate, etc.
        server_ctx = Context(TLSv1_METHOD)
        server_ctx.set_options(OP_NO_SSLv2 | OP_NO_SSLv3 | OP_SINGLE_DH_USE )
        server_ctx.set_verify(VERIFY_PEER|VERIFY_FAIL_IF_NO_PEER_CERT|VERIFY_CLIENT_ONCE, verify_cb)
        server_store = server_ctx.get_cert_store()
        server_ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
        server_ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
        server_ctx.check_privatekey()
        server_store.add_cert(load_certificate(FILETYPE_PEM, root_cert_pem))
        # Here the Connection is actually created.  If None is passed as the 2nd
        # parameter, it indicates a memory BIO should be created.
        server_conn = Connection(server_ctx, sock)
        server_conn.set_accept_state()
        return server_conn
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _client(self, sock):
        """
        Create a new client-side SSL L{Connection} object wrapped around
        C{sock}.
        """
        # Now create the client side Connection.  Similar boilerplate to the
        # above.
        client_ctx = Context(TLSv1_METHOD)
        client_ctx.set_options(OP_NO_SSLv2 | OP_NO_SSLv3 | OP_SINGLE_DH_USE )
        client_ctx.set_verify(VERIFY_PEER|VERIFY_FAIL_IF_NO_PEER_CERT|VERIFY_CLIENT_ONCE, verify_cb)
        client_store = client_ctx.get_cert_store()
        client_ctx.use_privatekey(load_privatekey(FILETYPE_PEM, client_key_pem))
        client_ctx.use_certificate(load_certificate(FILETYPE_PEM, client_cert_pem))
        client_ctx.check_privatekey()
        client_store.add_cert(load_certificate(FILETYPE_PEM, root_cert_pem))
        client_conn = Connection(client_ctx, sock)
        client_conn.set_connect_state()
        return client_conn
项目:microProxy    作者:mike820324    | 项目源码 | 文件源码
def create_dest_sslcontext(insecure=False, trusted_ca_certs="", alpn=None):
    ssl_ctx = create_basic_sslcontext()

    if not insecure:
        trusted_ca_certs = trusted_ca_certs or certifi.where()
        ssl_ctx.load_verify_locations(trusted_ca_certs)
        ssl_ctx.set_verify(
            SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
            certificate_verify_cb)
    else:
        ssl_ctx.set_verify(SSL.VERIFY_NONE, certificate_verify_cb)

    if alpn and HAS_ALPN:
        ssl_ctx.set_alpn_protos(alpn)

    return ssl_ctx
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def _server(self, sock):
        """
        Create a new server-side SSL L{Connection} object wrapped around
        C{sock}.
        """
        # Create the server side Connection.  This is mostly setup boilerplate
        # - use TLSv1, use a particular certificate, etc.
        server_ctx = Context(TLSv1_METHOD)
        server_ctx.set_options(OP_NO_SSLv2 | OP_NO_SSLv3 | OP_SINGLE_DH_USE )
        server_ctx.set_verify(VERIFY_PEER|VERIFY_FAIL_IF_NO_PEER_CERT|VERIFY_CLIENT_ONCE, verify_cb)
        server_store = server_ctx.get_cert_store()
        server_ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem))
        server_ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem))
        server_ctx.check_privatekey()
        server_store.add_cert(load_certificate(FILETYPE_PEM, root_cert_pem))
        # Here the Connection is actually created.  If None is passed as the 2nd
        # parameter, it indicates a memory BIO should be created.
        server_conn = Connection(server_ctx, sock)
        server_conn.set_accept_state()
        return server_conn
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def _client(self, sock):
        """
        Create a new client-side SSL L{Connection} object wrapped around
        C{sock}.
        """
        # Now create the client side Connection.  Similar boilerplate to the
        # above.
        client_ctx = Context(TLSv1_METHOD)
        client_ctx.set_options(OP_NO_SSLv2 | OP_NO_SSLv3 | OP_SINGLE_DH_USE )
        client_ctx.set_verify(VERIFY_PEER|VERIFY_FAIL_IF_NO_PEER_CERT|VERIFY_CLIENT_ONCE, verify_cb)
        client_store = client_ctx.get_cert_store()
        client_ctx.use_privatekey(load_privatekey(FILETYPE_PEM, client_key_pem))
        client_ctx.use_certificate(load_certificate(FILETYPE_PEM, client_cert_pem))
        client_ctx.check_privatekey()
        client_store.add_cert(load_certificate(FILETYPE_PEM, root_cert_pem))
        client_conn = Connection(client_ctx, sock)
        client_conn.set_connect_state()
        return client_conn
项目:slack_scholar    作者:xLeitix    | 项目源码 | 文件源码
def test03_ssl_verification_of_peer_fails(self):
        ctx = SSL.Context(SSL.TLSv1_METHOD)

        def verify_callback(conn, x509, errnum, errdepth, preverify_ok): 
            log.debug('SSL peer certificate verification failed for %r',
                      x509.get_subject())
            return preverify_ok 

        ctx.set_verify(SSL.VERIFY_PEER, verify_callback)
        ctx.set_verify_depth(9)

        # Set bad location - unit test dir has no CA certs to verify with
        ctx.load_verify_locations(None, Constants.UNITTEST_DIR)

        conn = HTTPSConnection(Constants.HOSTNAME, port=Constants.PORT,
                               ssl_context=ctx)
        conn.connect()        
        self.assertRaises(SSL.Error, conn.request, 'GET', '/')
项目:slack_scholar    作者:xLeitix    | 项目源码 | 文件源码
def test03_ssl_verification_of_peer_succeeds(self):
        ctx = SSL.Context(SSL.TLSv1_METHOD)

        verify_callback = lambda conn, x509, errnum, errdepth, preverify_ok: \
            preverify_ok 

        ctx.set_verify(SSL.VERIFY_PEER, verify_callback)
        ctx.set_verify_depth(9)

        # Set correct location for CA certs to verify with
        ctx.load_verify_locations(None, Constants.CACERT_DIR)

        conn = HTTPSConnection(Constants.HOSTNAME, port=Constants.PORT,
                               ssl_context=ctx)
        conn.connect()
        conn.request('GET', '/')
        resp = conn.getresponse()
        print('Response = %s' % resp.read())
项目:slack_scholar    作者:xLeitix    | 项目源码 | 文件源码
def test04_ssl_verification_with_subj_alt_name(self):
        ctx = SSL.Context(SSL.TLSv1_METHOD)

        verification = ServerSSLCertVerification(hostname='localhost')
        verify_callback = verification.get_verify_server_cert_func()

        ctx.set_verify(SSL.VERIFY_PEER, verify_callback)
        ctx.set_verify_depth(9)

        # Set correct location for CA certs to verify with
        ctx.load_verify_locations(None, Constants.CACERT_DIR)

        conn = HTTPSConnection(Constants.HOSTNAME, port=Constants.PORT,
                               ssl_context=ctx)
        conn.connect()
        conn.request('GET', '/')
        resp = conn.getresponse()
        print('Response = %s' % resp.read())
项目:estreamer    作者:spohara79    | 项目源码 | 文件源码
def __enter__(self):
        self.ctx = SSL.Context(SSL.TLSv1_METHOD)
        if self.verify:
            self.ctx.set_verify(SSL.VERIFY_PEER, self.validate_cert)
            self.ctx.load_verify_locations(self.verify)
            self.trusted_cert = crypto.load_certificate(crypto.FILETYPE_PEM, file(self.verify).read())
        self.ctx.use_privatekey(self.pkey)
        self.ctx.use_certificate(self.cert)
        self.sock = SSL.Connection(self.ctx, socket.socket(socket.AF_INET, socket.SOCK_STREAM))
        self.sock.connect((self.host, self.port))
        return self
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def set_peer_verification_for_url_hostname(ssl_context, url, 
                                           if_verify_enabled=False):
    '''Convenience routine to set peer verification callback based on
    ServerSSLCertVerification class'''
    if not if_verify_enabled or (ssl_context.get_verify_mode() & SSL.VERIFY_PEER):
        urlObj = urlparse_.urlparse(url)
        hostname = urlObj.hostname
        server_ssl_cert_verif = ServerSSLCertVerification(hostname=hostname)
        verify_callback_ = server_ssl_cert_verif.get_verify_server_cert_func()
        ssl_context.set_verify(SSL.VERIFY_PEER, verify_callback_)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test04_open_peer_cert_verification_fails(self):
        # Explicitly set empty CA directory to make verification fail
        ctx = SSL.Context(SSL.TLSv1_METHOD)
        verify_callback = lambda conn, x509, errnum, errdepth, preverify_ok: \
            preverify_ok 

        ctx.set_verify(SSL.VERIFY_PEER, verify_callback)
        ctx.load_verify_locations(None, './')
        opener = build_opener(ssl_context=ctx)
        self.assertRaises(SSL.Error, opener.open, Constants.TEST_URI)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _load_verify_locations_test(self, *args):
        (server, client) = socket_pair()

        clientContext = Context(TLSv1_METHOD)
        clientContext.load_verify_locations(*args)
        # Require that the server certificate verify properly or the
        # connection will fail.
        clientContext.set_verify(
            VERIFY_PEER,
            lambda conn, cert, errno, depth, preverify_ok: preverify_ok)

        clientSSL = Connection(clientContext, client)
        clientSSL.set_connect_state()

        serverContext = Context(TLSv1_METHOD)
        serverContext.use_certificate(
            load_certificate(FILETYPE_PEM, cleartextCertificatePEM))
        serverContext.use_privatekey(
            load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM))

        serverSSL = Connection(serverContext, server)
        serverSSL.set_accept_state()

        for i in range(3):
            for ssl in clientSSL, serverSSL:
                try:
                    # Without load_verify_locations above, the handshake
                    # will fail:
                    # Error: [('SSL routines', 'SSL3_GET_SERVER_CERTIFICATE',
                    #          'certificate verify failed')]
                    ssl.do_handshake()
                except WantReadError:
                    pass

        cert = clientSSL.get_peer_certificate()
        self.assertEqual(cert.get_subject().CN, 'Testing Root CA')
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_set_default_verify_paths(self):
            """
            L{Context.set_default_verify_paths} causes the platform-specific CA
            certificate locations to be used for verification purposes.
            """
            # Testing this requires a server with a certificate signed by one of
            # the CAs in the platform CA location.  Getting one of those costs
            # money.  Fortunately (or unfortunately, depending on your
            # perspective), it's easy to think of a public server on the
            # internet which has such a certificate.  Connecting to the network
            # in a unit test is bad, but it's the only way I can think of to
            # really test this. -exarkun

            # Arg, verisign.com doesn't speak TLSv1
            context = Context(SSLv3_METHOD)
            context.set_default_verify_paths()
            context.set_verify(
                VERIFY_PEER,
                lambda conn, cert, errno, depth, preverify_ok: preverify_ok)

            client = socket()
            client.connect(('verisign.com', 443))
            clientSSL = Connection(context, client)
            clientSSL.set_connect_state()
            clientSSL.do_handshake()
            clientSSL.send('GET / HTTP/1.0\r\n\r\n')
            self.assertTrue(clientSSL.recv(1024))
项目:2FAssassin    作者:maxwellkoh    | 项目源码 | 文件源码
def go():
    port = socket()
    port.bind(('', 0))
    port.listen(1)

    called = []
    def info(*args):
        print count.next()
        called.append(None)
        return 1
    context = Context(TLSv1_METHOD)
    context.set_verify(VERIFY_PEER, info)
    context.use_certificate(
        load_certificate(FILETYPE_PEM, cleartextCertificatePEM))
    context.use_privatekey(
        load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM))

    while 1:
        client = socket()
        client.setblocking(False)
        client.connect_ex(port.getsockname())

        clientSSL = Connection(context, client)
        clientSSL.set_connect_state()

        server, ignored = port.accept()
        server.setblocking(False)

        serverSSL = Connection(context, server)
        serverSSL.set_accept_state()

        del called[:]
        while not called:
            for ssl in clientSSL, serverSSL:
                try:
                    ssl.send('foo')
                except WantReadError, e:
                    pass
项目:microProxy    作者:mike820324    | 项目源码 | 文件源码
def test_handshake_fail(self):
        def verify_cb(conn, x509, err_num, err_depth, err_code):
            return False

        server_future = self.server_start_tls(_server_ssl_options())
        client_future = self.client_start_tls(
            _client_ssl_options(SSL.VERIFY_PEER, verify_cb))

        with self.assertRaises(SSL.Error):
            yield client_future
        with self.assertRaises((SSL.Error, socket.error)):
            yield server_future
项目:microProxy    作者:mike820324    | 项目源码 | 文件源码
def test_check_hostname(self):
        def verify_cb(conn, x509, err_num, err_depth, err_code):
            return True

        server_future = self.server_start_tls(_server_ssl_options())
        client_future = self.client_start_tls(
            _client_ssl_options(SSL.VERIFY_PEER, verify_cb),
            server_hostname=b'localhost')
        with self.assertRaises(VerificationError):
            yield client_future
        # TODO: server will not raise.
        # with self.assertRaises(Exception):
        yield server_future
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def _load_verify_locations_test(self, *args):
        (server, client) = socket_pair()

        clientContext = Context(TLSv1_METHOD)
        clientContext.load_verify_locations(*args)
        # Require that the server certificate verify properly or the
        # connection will fail.
        clientContext.set_verify(
            VERIFY_PEER,
            lambda conn, cert, errno, depth, preverify_ok: preverify_ok)

        clientSSL = Connection(clientContext, client)
        clientSSL.set_connect_state()

        serverContext = Context(TLSv1_METHOD)
        serverContext.use_certificate(
            load_certificate(FILETYPE_PEM, cleartextCertificatePEM))
        serverContext.use_privatekey(
            load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM))

        serverSSL = Connection(serverContext, server)
        serverSSL.set_accept_state()

        for i in range(3):
            for ssl in clientSSL, serverSSL:
                try:
                    # Without load_verify_locations above, the handshake
                    # will fail:
                    # Error: [('SSL routines', 'SSL3_GET_SERVER_CERTIFICATE',
                    #          'certificate verify failed')]
                    ssl.do_handshake()
                except WantReadError:
                    pass

        cert = clientSSL.get_peer_certificate()
        self.assertEqual(cert.get_subject().CN, 'Testing Root CA')
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def test_set_default_verify_paths(self):
            """
            L{Context.set_default_verify_paths} causes the platform-specific CA
            certificate locations to be used for verification purposes.
            """
            # Testing this requires a server with a certificate signed by one of
            # the CAs in the platform CA location.  Getting one of those costs
            # money.  Fortunately (or unfortunately, depending on your
            # perspective), it's easy to think of a public server on the
            # internet which has such a certificate.  Connecting to the network
            # in a unit test is bad, but it's the only way I can think of to
            # really test this. -exarkun

            # Arg, verisign.com doesn't speak TLSv1
            context = Context(SSLv3_METHOD)
            context.set_default_verify_paths()
            context.set_verify(
                VERIFY_PEER,
                lambda conn, cert, errno, depth, preverify_ok: preverify_ok)

            client = socket()
            client.connect(('verisign.com', 443))
            clientSSL = Connection(context, client)
            clientSSL.set_connect_state()
            clientSSL.do_handshake()
            clientSSL.send('GET / HTTP/1.0\r\n\r\n')
            self.assertTrue(clientSSL.recv(1024))
项目:python-gnutls    作者:AGProjects    | 项目源码 | 文件源码
def _makeContext(self):
        ctx = SSL.Context(self.method)
        ctx.set_app_data(_SSLApplicationData())

        if self.certificate is not None and self.privateKey is not None:
            ctx.use_certificate(self.certificate)
            ctx.use_privatekey(self.privateKey)
            # Sanity check
            ctx.check_privatekey()

        verifyFlags = SSL.VERIFY_NONE
        if self.verify:
            verifyFlags = SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT
            if self.caCerts:
                store = ctx.get_cert_store()
                for cert in self.caCerts:
                    store.add_cert(cert)

        def _trackVerificationProblems(conn,cert,errno,depth,preverify_ok):
            return True

        ctx.set_verify(verifyFlags, _trackVerificationProblems)

        if self.enableSessions:
            sessionName = md5.md5("%s-%d" % (reflect.qual(self.__class__), _sessionCounter())).hexdigest()
            ctx.set_session_id(sessionName)

        return ctx
项目:python-gnutls    作者:AGProjects    | 项目源码 | 文件源码
def _makeContext(self):
        ctx = SSL.Context(self.method)
        ctx.set_app_data(_SSLApplicationData())

        if self.certificate is not None and self.privateKey is not None:
            ctx.use_certificate(self.certificate)
            ctx.use_privatekey(self.privateKey)
            # Sanity check
            ctx.check_privatekey()

        verifyFlags = SSL.VERIFY_NONE
        if self.verify:
            verifyFlags = SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT
            if self.caCerts:
                store = ctx.get_cert_store()
                for cert in self.caCerts:
                    store.add_cert(cert)

        def _trackVerificationProblems(conn,cert,errno,depth,preverify_ok):
            return True

        ctx.set_verify(verifyFlags, _trackVerificationProblems)

        if self.enableSessions:
            sessionName = md5.md5("%s-%d" % (reflect.qual(self.__class__), _sessionCounter())).hexdigest()
            ctx.set_session_id(sessionName)

        return ctx
项目:mu    作者:excamera    | 项目源码 | 文件源码
def ssl_context(cacert, srvcrt, srvkey):
    # general setup: TLSv1.2, no compression, paranoid ciphers
    sslctx = SSL.Context(SSL.TLSv1_2_METHOD)
    sslctx.set_verify_depth(9)
    sslctx.set_options(SSL.OP_NO_COMPRESSION)
    sslctx.set_mode(_ssl_lib.SSL_MODE_ENABLE_PARTIAL_WRITE | _ssl_lib.SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER)
    sslctx.set_cipher_list(libmu.defs.Defs.cipher_list)
    sslctx.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT, lambda _, __, ___, ____, ok: ok)

    # use CA cert provided during lambda invocation
    fmt_cert = format_ssl_cert(cacert)
    x509_cert = crypto.load_certificate(crypto.FILETYPE_PEM, fmt_cert)
    sslctx.get_cert_store().add_cert(x509_cert)

    # add my certificate chain
    has_cert = False
    for cert in srvcrt.split(' '):
        x509_cert = crypto.load_certificate(crypto.FILETYPE_PEM, format_ssl_cert(cert))
        if not has_cert:
            sslctx.use_certificate(x509_cert)
            has_cert = True
        else:
            sslctx.add_extra_chain_cert(x509_cert)

    # private key
    sslctx.use_privatekey(crypto.load_privatekey(crypto.FILETYPE_PEM, format_ssl_key(srvkey)))

    # check that all's well
    sslctx.check_privatekey()

    return sslctx

###
#  SSLize a connected socket, requiring a supplied cacert
###
项目:package-33c3    作者:info-beamer    | 项目源码 | 文件源码
def set_peer_verification_for_url_hostname(ssl_context, url, 
                                           if_verify_enabled=False):
    '''Convenience routine to set peer verification callback based on
    ServerSSLCertVerification class'''
    if not if_verify_enabled or (ssl_context.get_verify_mode() & SSL.VERIFY_PEER):
        urlObj = urlparse_.urlparse(url)
        hostname = urlObj.hostname
        server_ssl_cert_verif = ServerSSLCertVerification(hostname=hostname)
        verify_callback_ = server_ssl_cert_verif.get_verify_server_cert_func()
        ssl_context.set_verify(SSL.VERIFY_PEER, verify_callback_)
项目:package-33c3    作者:info-beamer    | 项目源码 | 文件源码
def set_peer_verification_for_url_hostname(ssl_context, url, 
                                           if_verify_enabled=False):
    '''Convenience routine to set peer verification callback based on
    ServerSSLCertVerification class'''
    if not if_verify_enabled or (ssl_context.get_verify_mode() & SSL.VERIFY_PEER):
        urlObj = urlparse_.urlparse(url)
        hostname = urlObj.hostname
        server_ssl_cert_verif = ServerSSLCertVerification(hostname=hostname)
        verify_callback_ = server_ssl_cert_verif.get_verify_server_cert_func()
        ssl_context.set_verify(SSL.VERIFY_PEER, verify_callback_)
项目:slack_scholar    作者:xLeitix    | 项目源码 | 文件源码
def set_peer_verification_for_url_hostname(ssl_context, url, 
                                           if_verify_enabled=False):
    '''Convenience routine to set peer verification callback based on
    ServerSSLCertVerification class'''
    if not if_verify_enabled or (ssl_context.get_verify_mode() & SSL.VERIFY_PEER):
        urlObj = urlparse_.urlparse(url)
        hostname = urlObj.hostname
        server_ssl_cert_verif = ServerSSLCertVerification(hostname=hostname)
        verify_callback_ = server_ssl_cert_verif.get_verify_server_cert_func()
        ssl_context.set_verify(SSL.VERIFY_PEER, verify_callback_)
项目:slack_scholar    作者:xLeitix    | 项目源码 | 文件源码
def test04_open_peer_cert_verification_fails(self):
        # Explicitly set empty CA directory to make verification fail
        ctx = SSL.Context(SSL.TLSv1_METHOD)
        verify_callback = lambda conn, x509, errnum, errdepth, preverify_ok: \
            preverify_ok 

        ctx.set_verify(SSL.VERIFY_PEER, verify_callback)
        ctx.load_verify_locations(None, './')
        opener = build_opener(ssl_context=ctx)
        self.assertRaises(SSL.Error, opener.open, Constants.TEST_URI)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def testFailedVerify(self):
        org = "twisted.test.test_ssl"
        self.setupServerAndClient(
            (org, org + ", client"), {},
            (org, org + ", server"), {})

        def verify(*a):
            return False
        self.clientCtxFactory.getContext().set_verify(SSL.VERIFY_PEER, verify)

        serverConnLost = defer.Deferred()
        serverProtocol = protocol.Protocol()
        serverProtocol.connectionLost = serverConnLost.callback
        serverProtocolFactory = protocol.ServerFactory()
        serverProtocolFactory.protocol = lambda: serverProtocol
        self.serverPort = serverPort = reactor.listenSSL(0,
            serverProtocolFactory, self.serverCtxFactory)

        clientConnLost = defer.Deferred()
        clientProtocol = protocol.Protocol()
        clientProtocol.connectionLost = clientConnLost.callback
        clientProtocolFactory = protocol.ClientFactory()
        clientProtocolFactory.protocol = lambda: clientProtocol
        reactor.connectSSL('127.0.0.1',
            serverPort.getHost().port, clientProtocolFactory, self.clientCtxFactory)

        dl = defer.DeferredList([serverConnLost, clientConnLost], consumeErrors=True)
        return dl.addCallback(self._cbLostConns)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def make_ssl_context(key_file=None, cert_file=None, pem_file=None, ca_dir=None,
                     verify_peer=False, url=None, method=SSL.TLSv1_METHOD,
                     key_file_passphrase=None):
    """
    Creates SSL context containing certificate and key file locations.
    """
    ssl_context = SSL.Context(method)

    # Key file defaults to certificate file if present.
    if cert_file:
        ssl_context.use_certificate_file(cert_file)

    if key_file_passphrase:
        passwd_cb = lambda max_passphrase_len, set_prompt, userdata: \
                           key_file_passphrase 
        ssl_context.set_passwd_cb(passwd_cb)

    if key_file:
        ssl_context.use_privatekey_file(key_file)
    elif cert_file:
        ssl_context.use_privatekey_file(cert_file)

    if pem_file or ca_dir:
        ssl_context.load_verify_locations(pem_file, ca_dir)

    def _callback(conn, x509, errnum, errdepth, preverify_ok):
        """Default certification verification callback.
        Performs no checks and returns the status passed in.
        """
        return preverify_ok

    verify_callback = _callback

    if verify_peer:
        ssl_context.set_verify_depth(9)
        if url:
            set_peer_verification_for_url_hostname(ssl_context, url)
        else:
            ssl_context.set_verify(SSL.VERIFY_PEER, verify_callback)
    else:
        ssl_context.set_verify(SSL.VERIFY_NONE, verify_callback)

    return ssl_context
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _makeContext(self):
        ctx = SSL.Context(self.method)
        ctx.set_app_data(_SSLApplicationData())

        if self.certificate is not None and self.privateKey is not None:
            ctx.use_certificate(self.certificate)
            ctx.use_privatekey(self.privateKey)
            # Sanity check
            ctx.check_privatekey()

        verifyFlags = SSL.VERIFY_NONE
        if self.verify:
            verifyFlags = SSL.VERIFY_PEER
            if self.requireCertificate:
                verifyFlags |= SSL.VERIFY_FAIL_IF_NO_PEER_CERT
            if self.verifyOnce:
                verifyFlags |= SSL.VERIFY_CLIENT_ONCE
            if self.caCerts:
                store = ctx.get_cert_store()
                for cert in self.caCerts:
                    store.add_cert(cert)

        def _trackVerificationProblems(conn,cert,errno,depth,preverify_ok):
            # retcode is the answer OpenSSL's default verifier would have
            # given, had we allowed it to run.
            if not preverify_ok:
                ctx.get_app_data().problems.append(OpenSSLVerifyError(cert, errno, depth))
            return preverify_ok
        ctx.set_verify(verifyFlags, _trackVerificationProblems)

        if self.verifyDepth is not None:
            ctx.set_verify_depth(self.verifyDepth)

        if self.enableSingleUseKeys:
            ctx.set_options(SSL.OP_SINGLE_DH_USE)

        if self.fixBrokenPeers:
            ctx.set_options(self._OP_ALL)

        if self.enableSessions:
            sessionName = md5.md5("%s-%d" % (reflect.qual(self.__class__), _sessionCounter())).hexdigest()
            ctx.set_session_id(sessionName)

        return ctx
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def _makeContext(self):
        ctx = SSL.Context(self.method)
        ctx.set_app_data(_SSLApplicationData())

        if self.certificate is not None and self.privateKey is not None:
            ctx.use_certificate(self.certificate)
            ctx.use_privatekey(self.privateKey)
            # Sanity check
            ctx.check_privatekey()

        verifyFlags = SSL.VERIFY_NONE
        if self.verify:
            verifyFlags = SSL.VERIFY_PEER
            if self.requireCertificate:
                verifyFlags |= SSL.VERIFY_FAIL_IF_NO_PEER_CERT
            if self.verifyOnce:
                verifyFlags |= SSL.VERIFY_CLIENT_ONCE
            if self.caCerts:
                store = ctx.get_cert_store()
                for cert in self.caCerts:
                    store.add_cert(cert)

        def _trackVerificationProblems(conn,cert,errno,depth,preverify_ok):
            # retcode is the answer OpenSSL's default verifier would have
            # given, had we allowed it to run.
            if not preverify_ok:
                ctx.get_app_data().problems.append(OpenSSLVerifyError(cert, errno, depth))
            return preverify_ok
        ctx.set_verify(verifyFlags, _trackVerificationProblems)

        if self.verifyDepth is not None:
            ctx.set_verify_depth(self.verifyDepth)

        if self.enableSingleUseKeys:
            ctx.set_options(SSL.OP_SINGLE_DH_USE)

        if self.fixBrokenPeers:
            ctx.set_options(self._OP_ALL)

        if self.enableSessions:
            sessionName = md5.md5("%s-%d" % (reflect.qual(self.__class__), _sessionCounter())).hexdigest()
            ctx.set_session_id(sessionName)

        return ctx
项目:package-33c3    作者:info-beamer    | 项目源码 | 文件源码
def make_ssl_context(key_file=None, cert_file=None, pem_file=None, ca_dir=None,
                     verify_peer=False, url=None, method=SSL.TLSv1_METHOD,
                     key_file_passphrase=None):
    """
    Creates SSL context containing certificate and key file locations.
    """
    ssl_context = SSL.Context(method)

    # Key file defaults to certificate file if present.
    if cert_file:
        ssl_context.use_certificate_file(cert_file)

    if key_file_passphrase:
        passwd_cb = lambda max_passphrase_len, set_prompt, userdata: \
                           key_file_passphrase 
        ssl_context.set_passwd_cb(passwd_cb)

    if key_file:
        ssl_context.use_privatekey_file(key_file)
    elif cert_file:
        ssl_context.use_privatekey_file(cert_file)

    if pem_file or ca_dir:
        ssl_context.load_verify_locations(pem_file, ca_dir)

    def _callback(conn, x509, errnum, errdepth, preverify_ok):
        """Default certification verification callback.
        Performs no checks and returns the status passed in.
        """
        return preverify_ok

    verify_callback = _callback

    if verify_peer:
        ssl_context.set_verify_depth(9)
        if url:
            set_peer_verification_for_url_hostname(ssl_context, url)
        else:
            ssl_context.set_verify(SSL.VERIFY_PEER, verify_callback)
    else:
        ssl_context.set_verify(SSL.VERIFY_NONE, verify_callback)

    return ssl_context
项目:package-33c3    作者:info-beamer    | 项目源码 | 文件源码
def make_ssl_context(key_file=None, cert_file=None, pem_file=None, ca_dir=None,
                     verify_peer=False, url=None, method=SSL.TLSv1_METHOD,
                     key_file_passphrase=None):
    """
    Creates SSL context containing certificate and key file locations.
    """
    ssl_context = SSL.Context(method)

    # Key file defaults to certificate file if present.
    if cert_file:
        ssl_context.use_certificate_file(cert_file)

    if key_file_passphrase:
        passwd_cb = lambda max_passphrase_len, set_prompt, userdata: \
                           key_file_passphrase 
        ssl_context.set_passwd_cb(passwd_cb)

    if key_file:
        ssl_context.use_privatekey_file(key_file)
    elif cert_file:
        ssl_context.use_privatekey_file(cert_file)

    if pem_file or ca_dir:
        ssl_context.load_verify_locations(pem_file, ca_dir)

    def _callback(conn, x509, errnum, errdepth, preverify_ok):
        """Default certification verification callback.
        Performs no checks and returns the status passed in.
        """
        return preverify_ok

    verify_callback = _callback

    if verify_peer:
        ssl_context.set_verify_depth(9)
        if url:
            set_peer_verification_for_url_hostname(ssl_context, url)
        else:
            ssl_context.set_verify(SSL.VERIFY_PEER, verify_callback)
    else:
        ssl_context.set_verify(SSL.VERIFY_NONE, verify_callback)

    return ssl_context
项目:slack_scholar    作者:xLeitix    | 项目源码 | 文件源码
def make_ssl_context(key_file=None, cert_file=None, pem_file=None, ca_dir=None,
                     verify_peer=False, url=None, method=SSL.TLSv1_METHOD,
                     key_file_passphrase=None):
    """
    Creates SSL context containing certificate and key file locations.
    """
    ssl_context = SSL.Context(method)

    # Key file defaults to certificate file if present.
    if cert_file:
        ssl_context.use_certificate_file(cert_file)

    if key_file_passphrase:
        passwd_cb = lambda max_passphrase_len, set_prompt, userdata: \
                           key_file_passphrase 
        ssl_context.set_passwd_cb(passwd_cb)

    if key_file:
        ssl_context.use_privatekey_file(key_file)
    elif cert_file:
        ssl_context.use_privatekey_file(cert_file)

    if pem_file or ca_dir:
        ssl_context.load_verify_locations(pem_file, ca_dir)

    def _callback(conn, x509, errnum, errdepth, preverify_ok):
        """Default certification verification callback.
        Performs no checks and returns the status passed in.
        """
        return preverify_ok

    verify_callback = _callback

    if verify_peer:
        ssl_context.set_verify_depth(9)
        if url:
            set_peer_verification_for_url_hostname(ssl_context, url)
        else:
            ssl_context.set_verify(SSL.VERIFY_PEER, verify_callback)
    else:
        ssl_context.set_verify(SSL.VERIFY_NONE, verify_callback)

    return ssl_context
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def _makeContext(self):
        ctx = self._contextFactory(self.method)
        ctx.set_options(self._options)
        ctx.set_mode(self._mode)

        if self.certificate is not None and self.privateKey is not None:
            ctx.use_certificate(self.certificate)
            ctx.use_privatekey(self.privateKey)
            for extraCert in self.extraCertChain:
                ctx.add_extra_chain_cert(extraCert)
            # Sanity check
            ctx.check_privatekey()

        verifyFlags = SSL.VERIFY_NONE
        if self.verify:
            verifyFlags = SSL.VERIFY_PEER
            if self.requireCertificate:
                verifyFlags |= SSL.VERIFY_FAIL_IF_NO_PEER_CERT
            if self.verifyOnce:
                verifyFlags |= SSL.VERIFY_CLIENT_ONCE
            self.trustRoot._addCACertsToContext(ctx)

        # It'd be nice if pyOpenSSL let us pass None here for this behavior (as
        # the underlying OpenSSL API call allows NULL to be passed).  It
        # doesn't, so we'll supply a function which does the same thing.
        def _verifyCallback(conn, cert, errno, depth, preverify_ok):
            return preverify_ok
        ctx.set_verify(verifyFlags, _verifyCallback)
        if self.verifyDepth is not None:
            ctx.set_verify_depth(self.verifyDepth)

        if self.enableSessions:
            name = "%s-%d" % (reflect.qual(self.__class__), _sessionCounter())
            sessionName = md5(networkString(name)).hexdigest()

            ctx.set_session_id(sessionName.encode('ascii'))

        if self.dhParameters:
            ctx.load_tmp_dh(self.dhParameters._dhFile.path)
        ctx.set_cipher_list(self._cipherString.encode('ascii'))

        if self._ecCurve is not None:
            try:
                self._ecCurve.addECKeyToContext(ctx)
            except BaseException:
                pass  # ECDHE support is best effort only.

        if self._acceptableProtocols:
            # Try to set NPN and ALPN. _acceptableProtocols cannot be set by
            # the constructor unless at least one mechanism is supported.
            _setAcceptableProtocols(ctx, self._acceptableProtocols)

        return ctx