Python ssl 模块,create_default_context() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ssl.create_default_context()

项目:python-keylime    作者:mit-ll    | 项目源码 | 文件源码
def init_mtls(config):
    logger.info("Setting up mTLS...")

    tls_dir = config["ca_dir"]
    if tls_dir[0]!='/':
        tls_dir = os.path.abspath('%s/%s'%(common.WORK_DIR,tls_dir))

    # We need to securely pull in the ca password 
    my_key_pw = getpass.getpass("Please enter the password to decrypt your keystore: ")
    ca_util.setpassword(my_key_pw)

    # Create HIL Server Connect certs (if not already present) 
    if not os.path.exists("%s/%s-cert.crt"%(tls_dir,config["ip"])):
        logger.info("Generating new Node Monitor TLS Certs in %s for connecting"%tls_dir)
        ca_util.cmd_mkcert(tls_dir,config["ip"])

    ca_path = "%s/cacert.crt"%(tls_dir)
    my_cert = "%s/%s-cert.crt"%(tls_dir,config["ip"])
    my_priv_key = "%s/%s-private.pem"%(tls_dir,config["ip"])

    context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
    context.load_verify_locations(cafile=ca_path)
    context.load_cert_chain(certfile=my_cert,keyfile=my_priv_key,password=my_key_pw)
    context.verify_mode = ssl.CERT_REQUIRED
    return context
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def urlopen(url, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
            cafile=None, capath=None, cadefault=False, context=None):
    global _opener
    if cafile or capath or cadefault:
        if context is not None:
            raise ValueError(
                "You can't pass both context and any of cafile, capath, and "
                "cadefault"
            )
        if not _have_ssl:
            raise ValueError('SSL support not available')
        context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH,
                                             cafile=cafile,
                                             capath=capath)
        https_handler = HTTPSHandler(context=context)
        opener = build_opener(https_handler)
    elif context:
        https_handler = HTTPSHandler(context=context)
        opener = build_opener(https_handler)
    elif _opener is None:
        _opener = opener = build_opener()
    else:
        opener = _opener
    return opener.open(url, data, timeout)
项目:rci    作者:seecloud    | 项目源码 | 文件源码
def __init__(self, auth_url, username, tenant, loop=None, log=None,
                 cafile=None, token_renew_delay=3300):
        self.auth_url = auth_url
        self.username = username
        self.tenant = tenant
        self.log = log
        self.token_renew_delay = token_renew_delay
        self.loop = loop or asyncio.get_event_loop()
        self.headers = {"content-type": "application/json",
                        "accept": "application/json"}
        if cafile:
            sslcontext = ssl.create_default_context(cafile=cafile)
            conn = aiohttp.TCPConnector(ssl_context=sslcontext)
            self.session = aiohttp.ClientSession(connector=conn, loop=self.loop)
        else:
            session = aiohttp.ClientSession(loop=self.loop)
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def _create_transport_context(server_side, server_hostname):
    if server_side:
        raise ValueError('Server side SSL needs a valid SSLContext')

    # Client side may pass ssl=True to use a default
    # context; in that case the sslcontext passed is None.
    # The default is secure for client connections.
    if hasattr(ssl, 'create_default_context'):
        # Python 3.4+: use up-to-date strong settings.
        sslcontext = ssl.create_default_context()
        if not server_hostname:
            sslcontext.check_hostname = False
    else:
        # Fallback for Python 3.3.
        sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
        sslcontext.options |= ssl.OP_NO_SSLv2
        sslcontext.options |= ssl.OP_NO_SSLv3
        sslcontext.set_default_verify_paths()
        sslcontext.verify_mode = ssl.CERT_REQUIRED
    return sslcontext
项目:Qyoutube-dl    作者:lzambella    | 项目源码 | 文件源码
def make_HTTPS_handler(params, **kwargs):
    opts_no_check_certificate = params.get('nocheckcertificate', False)
    if hasattr(ssl, 'create_default_context'):  # Python >= 3.4 or 2.7.9
        context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
        if opts_no_check_certificate:
            context.check_hostname = False
            context.verify_mode = ssl.CERT_NONE
        try:
            return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
        except TypeError:
            # Python 2.7.8
            # (create_default_context present but HTTPSHandler has no context=)
            pass

    if sys.version_info < (3, 2):
        return YoutubeDLHTTPSHandler(params, **kwargs)
    else:  # Python < 3.4
        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        context.verify_mode = (ssl.CERT_NONE
                               if opts_no_check_certificate
                               else ssl.CERT_REQUIRED)
        context.set_default_verify_paths()
        return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
项目:watchmen    作者:lycclsltt    | 项目源码 | 文件源码
def _create_ssl_ctx(self, sslp):
        if isinstance(sslp, ssl.SSLContext):
            return sslp
        ca = sslp.get('ca')
        capath = sslp.get('capath')
        hasnoca = ca is None and capath is None
        ctx = ssl.create_default_context(cafile=ca, capath=capath)
        ctx.check_hostname = not hasnoca and sslp.get('check_hostname', True)
        ctx.verify_mode = ssl.CERT_NONE if hasnoca else ssl.CERT_REQUIRED
        if 'cert' in sslp:
            ctx.load_cert_chain(sslp['cert'], keyfile=sslp.get('key'))
        if 'cipher' in sslp:
            ctx.set_ciphers(sslp['cipher'])
        ctx.options |= ssl.OP_NO_SSLv2
        ctx.options |= ssl.OP_NO_SSLv3
        return ctx
项目:touch-pay-client    作者:HackPucBemobi    | 项目源码 | 文件源码
def __init__(self, timeout=None, proxy=None, cacert=None, sessions=False):
        if (timeout is not None) and not self.supports_feature('timeout'):
            raise RuntimeError('timeout is not supported with urllib2 transport')
        if proxy:
            raise RuntimeError('proxy is not supported with urllib2 transport')
        if cacert:
            raise RuntimeError('cacert is not support with urllib2 transport')

        handlers = []

        if ((sys.version_info[0] == 2 and sys.version_info >= (2,7,9)) or
            (sys.version_info[0] == 3 and sys.version_info >= (3,2,0))):
            context = ssl.create_default_context()
            context.check_hostname = False
            context.verify_mode = ssl.CERT_NONE
            handlers.append(urllib2.HTTPSHandler(context=context))

        if sessions:
            handlers.append(urllib2.HTTPCookieProcessor(CookieJar()))

        opener = urllib2.build_opener(*handlers)
        self.request_opener = opener.open
        self._timeout = timeout
项目:touch-pay-client    作者:HackPucBemobi    | 项目源码 | 文件源码
def _create_ssl_ctx(self, sslp):
        if isinstance(sslp, ssl.SSLContext):
            return sslp
        ca = sslp.get('ca')
        capath = sslp.get('capath')
        hasnoca = ca is None and capath is None
        ctx = ssl.create_default_context(cafile=ca, capath=capath)
        ctx.check_hostname = not hasnoca and sslp.get('check_hostname', True)
        ctx.verify_mode = ssl.CERT_NONE if hasnoca else ssl.CERT_REQUIRED
        if 'cert' in sslp:
            ctx.load_cert_chain(sslp['cert'], keyfile=sslp.get('key'))
        if 'cipher' in sslp:
            ctx.set_ciphers(sslp['cipher'])
        ctx.options |= ssl.OP_NO_SSLv2
        ctx.options |= ssl.OP_NO_SSLv3
        return ctx
项目:gprime    作者:GenealogyCollective    | 项目源码 | 文件源码
def urlopen_maybe_no_check_cert(URL):
    """
    Similar to urllib.request.urlopen, but disables certificate
    verification on Mac.
    """
    context = None
    from urllib.request import urlopen
    if mac():
        from ssl import create_default_context, CERT_NONE
        context = create_default_context()
        context.check_hostname = False
        context.verify_mode = CERT_NONE
    timeout = 10 # seconds
    fp = None
    try:
        fp = urlopen(URL, timeout=timeout, context=context)
    except TypeError:
        fp = urlopen(URL, timeout=timeout)
    return fp
项目:1and1-Mail-account-Manager    作者:sysadminstory    | 项目源码 | 文件源码
def __init__(self, username, password, provider):
        """Constructor used to init the config and authenticate"""
        self.config = oneandoneemailconfig.OneAndOneConfig(provider)
        self.url = self.config.get_config()
        self.account_cached = False

        self.oneandoneuser = username
        self.onenandonepassword = password
        self.headers = {'User-Agent': EmailAccountManager.userAgent}
        self.cookies = http.cookiejar.LWPCookieJar()
        handlers = [
            urllib.request.HTTPHandler(),
            urllib.request.HTTPSHandler(),
            urllib.request.HTTPCookieProcessor(self.cookies)
        ]
        ctx = ssl.create_default_context()
        ctx.check_hostname = False
        ctx.verify_mode = ssl.CERT_NONE
        self.opener = urllib.request.build_opener(
            urllib.request.HTTPSHandler(context=ctx), *handlers)
        self.is_logged = self.authenticate()
项目:telepresence    作者:datawire    | 项目源码 | 文件源码
def kubectl_or_oc(server: str) -> str:
    """
    Return "kubectl" or "oc", the command-line tool we should use.

    :param server: The URL of the cluster API server.
    """
    if which("oc") is None:
        return "kubectl"
    # We've got oc, and possibly kubectl as well. We only want oc for OpenShift
    # servers, so check for an OpenShift API endpoint:
    ctx = ssl.create_default_context()
    ctx.check_hostname = False
    ctx.verify_mode = ssl.CERT_NONE
    try:
        with urlopen(server + "/version/openshift", context=ctx) as u:
            u.read()
    except HTTPError:
        return "kubectl"
    else:
        return "oc"
项目:youtube_downloader    作者:aksinghdce    | 项目源码 | 文件源码
def make_HTTPS_handler(params, **kwargs):
    opts_no_check_certificate = params.get('nocheckcertificate', False)
    if hasattr(ssl, 'create_default_context'):  # Python >= 3.4 or 2.7.9
        context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
        if opts_no_check_certificate:
            context.check_hostname = False
            context.verify_mode = ssl.CERT_NONE
        try:
            return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
        except TypeError:
            # Python 2.7.8
            # (create_default_context present but HTTPSHandler has no context=)
            pass

    if sys.version_info < (3, 2):
        return YoutubeDLHTTPSHandler(params, **kwargs)
    else:  # Python < 3.4
        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        context.verify_mode = (ssl.CERT_NONE
                               if opts_no_check_certificate
                               else ssl.CERT_REQUIRED)
        context.set_default_verify_paths()
        return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def urlopen(url, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
            cafile=None, capath=None, cadefault=False, context=None):
    global _opener
    if cafile or capath or cadefault:
        if context is not None:
            raise ValueError(
                "You can't pass both context and any of cafile, capath, and "
                "cadefault"
            )
        if not _have_ssl:
            raise ValueError('SSL support not available')
        context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH,
                                             cafile=cafile,
                                             capath=capath)
        https_handler = HTTPSHandler(context=context)
        opener = build_opener(https_handler)
    elif context:
        https_handler = HTTPSHandler(context=context)
        opener = build_opener(https_handler)
    elif _opener is None:
        _opener = opener = build_opener()
    else:
        opener = _opener
    return opener.open(url, data, timeout)
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def get_opener():
    autoproxy = '127.0.0.1:8087'

    import ssl
    if getattr(ssl, "create_default_context", None):
        cafile = os.path.join(data_root, "gae_proxy", "CA.crt")
        if not os.path.isfile(cafile):
            cafile = None
        context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH,
                                             cafile=cafile)
        https_handler = urllib2.HTTPSHandler(context=context)

        opener = urllib2.build_opener(urllib2.ProxyHandler({'http': autoproxy, 'https': autoproxy}), https_handler)
    else:
        opener = urllib2.build_opener(urllib2.ProxyHandler({'http': autoproxy, 'https': autoproxy}))
    return opener
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def start_connection(self) -> None:
        if self.settings.ftps['no_certificate_check']:
            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
            context.verify_mode = ssl.CERT_NONE
            context.check_hostname = False
        else:
            context = ssl.create_default_context()
        self.ftps = FTP_TLS(
            host=self.settings.ftps['address'],
            user=self.settings.ftps['user'],
            passwd=self.settings.ftps['passwd'],
            context=context,
            source_address=self.settings.ftps['source_address'],
            timeout=self.settings.timeout_timer
        )

        # Hath downloads
        self.ftps.prot_p()
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def urlopen(url, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
            cafile=None, capath=None, cadefault=False, context=None):
    global _opener
    if cafile or capath or cadefault:
        if context is not None:
            raise ValueError(
                "You can't pass both context and any of cafile, capath, and "
                "cadefault"
            )
        if not _have_ssl:
            raise ValueError('SSL support not available')
        context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH,
                                             cafile=cafile,
                                             capath=capath)
        https_handler = HTTPSHandler(context=context)
        opener = build_opener(https_handler)
    elif context:
        https_handler = HTTPSHandler(context=context)
        opener = build_opener(https_handler)
    elif _opener is None:
        _opener = opener = build_opener()
    else:
        opener = _opener
    return opener.open(url, data, timeout)
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def get_opener():
    autoproxy = '127.0.0.1:8087'

    import ssl
    if getattr(ssl, "create_default_context", None):
        cafile = os.path.join(data_root, "gae_proxy", "CA.crt")
        if not os.path.isfile(cafile):
            cafile = None
        context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH,
                                             cafile=cafile)
        https_handler = urllib2.HTTPSHandler(context=context)

        opener = urllib2.build_opener(urllib2.ProxyHandler({'http': autoproxy, 'https': autoproxy}), https_handler)
    else:
        opener = urllib2.build_opener(urllib2.ProxyHandler({'http': autoproxy, 'https': autoproxy}))
    return opener
项目:trio    作者:python-trio    | 项目源码 | 文件源码
def server_thread_fn():
    server_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
    server_ctx.load_cert_chain("trio-test-1.pem")
    server = server_ctx.wrap_socket(
        server_sock,
        server_side=True,
        suppress_ragged_eofs=False,
    )
    while True:
        data = server.recv(4096)
        print("server got:", data)
        if not data:
            print("server waiting for client to finish everything")
            client_done.wait()
            print("server attempting to send back close-notify")
            server.unwrap()
            print("server ok")
            break
        server.sendall(data)
项目:trio    作者:python-trio    | 项目源码 | 文件源码
def test_ssl_handshake_failure_during_aclose():
    # Weird scenario: aclose() triggers an automatic handshake, and this
    # fails. This also exercises a bit of code in aclose() that was otherwise
    # uncovered, for re-raising exceptions after calling aclose_forcefully on
    # the underlying transport.
    async with ssl_echo_server_raw(expect_fail=True) as sock:
        # Don't configure trust correctly
        client_ctx = stdlib_ssl.create_default_context()
        s = tssl.SSLStream(
            sock, client_ctx, server_hostname="trio-test-1.example.org"
        )
        # It's a little unclear here whether aclose should swallow the error
        # or let it escape. We *do* swallow the error if it arrives when we're
        # sending close_notify, because both sides closing the connection
        # simultaneously is allowed. But I guess when https_compatible=False
        # then it's bad if we can get through a whole connection with a peer
        # that has no valid certificate, and never raise an error.
        with pytest.raises(BrokenStreamError) as excinfo:
            await s.aclose()
项目:aerospike-telemetry-agent    作者:aerospike    | 项目源码 | 文件源码
def __init__(self, url, proxy, cafile):
        self.url = url
        self.proxy = proxy
        if proxy:
            logging.info("Using HTTPS proxy: " + proxy)
            proxy_handler = urllib2.ProxyHandler({'https': proxy})
            opener = urllib2.build_opener(proxy_handler)
            urllib2.install_opener(opener)
        self.kwargs = {}
        if cafile and hasattr(ssl, "create_default_context"):
            logging.info("Using CA file: " + cafile)
            ctx = ssl.create_default_context()
            ctx.load_verify_locations(cafile = cafile)
            self.kwargs['context'] = ctx

    # given an infoMap returned by the local node, call up the home server
项目:Hermes    作者:nmpiazza    | 项目源码 | 文件源码
def parse_and_run(args=None):
    sslBaseDir = path.join(BASE_DIR, 'ssl')

    p = ArgumentParser()
    p.add_argument('--bind', '-b', action='store', help='the address to bind to', default='127.0.0.1')
    p.add_argument('--port', '-p', action='store', type=int, help='the port to listen on', default=8080)
    p.add_argument('--debug', '-d', action='store_true', help='enable debugging (use with caution)', default=False)
    p.add_argument('--ssl', '-s', action='store_true', help='enable ssl', default=False)

    args = p.parse_args(args)
    if args.ssl:
        ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
        ctx.load_cert_chain(path.join(sslBaseDir, 'server.crt'), path.join(sslBaseDir, 'server.key'))
        app.config['SESSION_TYPE'] = 'filesystem'

        app.run(host=args.bind, port=args.port, debug=args.debug, ssl_context=ctx)
    else:
        app.run(host=args.bind, port=args.port, debug=args.debug)
项目:Hermes    作者:nmpiazza    | 项目源码 | 文件源码
def parse_and_run(args=None):
    basedir = path.join(path.abspath(path.dirname(__file__)), 'Hermes', 'Server', 'ssl')
    p = ArgumentParser()
    p.add_argument('--bind', '-b', action='store', help='the address to bind to', default='127.0.0.1')
    p.add_argument('--port', '-p', action='store', type=int, help='the port to listen on', default=8080)
    p.add_argument('--debug', '-d', action='store_true', help='enable debugging (use with caution)', default=False)
    p.add_argument('--ssl', '-s', action='store_true', help='enable ssl', default=False)

    args = p.parse_args(args)
    if args.ssl:
        ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
        ctx.load_cert_chain(path.join(basedir, 'server.crt'), path.join(basedir, 'server.key'))
        app.config['SESSION_TYPE'] = 'filesystem'
        app.run(host=args.bind, port=args.port, debug=args.debug, ssl_context=ctx)
    else:
        app.run(host=args.bind, port=args.port, debug=args.debug)
项目:imapautofiler    作者:dhellmann    | 项目源码 | 文件源码
def __init__(self, cfg):
        super().__init__(cfg)

        # Use default client behavior if ca_file not provided.
        context = None
        if 'ca_file' in cfg['server']:
            context = ssl.create_default_context(
                cafile=cfg['server']['ca_file']
            )

        self._conn = imapclient.IMAPClient(
            cfg['server']['hostname'],
            use_uid=True,
            ssl=True,
            port=cfg['server'].get('port'),
            ssl_context=context,
        )
        username = cfg['server']['username']
        password = secrets.get_password(cfg)
        self._conn.login(username, password)
项目:optimalvibes    作者:littlemika    | 项目源码 | 文件源码
def make_HTTPS_handler(params, **kwargs):
    opts_no_check_certificate = params.get('nocheckcertificate', False)
    if hasattr(ssl, 'create_default_context'):  # Python >= 3.4 or 2.7.9
        context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
        if opts_no_check_certificate:
            context.check_hostname = False
            context.verify_mode = ssl.CERT_NONE
        try:
            return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
        except TypeError:
            # Python 2.7.8
            # (create_default_context present but HTTPSHandler has no context=)
            pass

    if sys.version_info < (3, 2):
        return YoutubeDLHTTPSHandler(params, **kwargs)
    else:  # Python < 3.4
        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        context.verify_mode = (ssl.CERT_NONE
                               if opts_no_check_certificate
                               else ssl.CERT_REQUIRED)
        context.set_default_verify_paths()
        return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def wrap_socket(self, sock):
        """Wrap a socket in an SSL context (see `ssl.wrap_socket`)

        :param socket: Plain socket
        :type socket: :class:`socket.socket`
        """
        if self._wrap_socket is None:
            if hasattr(ssl, 'SSLContext'):
                ssl_context = ssl.create_default_context(cafile=self.cafile)
                ssl_context.check_hostname = False
                if self.certfile is not None:
                    ssl_context.load_cert_chain(certfile=self.certfile,
                                                keyfile=self.keyfile,
                                                password=self.password)
                self._wrap_socket = ssl_context.wrap_socket
            else:
                self._wrap_socket = self._legacy_wrap_socket()
        return self._wrap_socket(sock)
项目:tvalacarta    作者:tvalacarta    | 项目源码 | 文件源码
def make_HTTPS_handler(params, **kwargs):
    opts_no_check_certificate = params.get('nocheckcertificate', False)
    if hasattr(ssl, 'create_default_context'):  # Python >= 3.4 or 2.7.9
        context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
        if opts_no_check_certificate:
            context.check_hostname = False
            context.verify_mode = ssl.CERT_NONE
        try:
            return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
        except TypeError:
            # Python 2.7.8
            # (create_default_context present but HTTPSHandler has no context=)
            pass

    if sys.version_info < (3, 2):
        return YoutubeDLHTTPSHandler(params, **kwargs)
    else:  # Python < 3.4
        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        context.verify_mode = (ssl.CERT_NONE
                               if opts_no_check_certificate
                               else ssl.CERT_REQUIRED)
        context.set_default_verify_paths()
        return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
项目:scroll    作者:acidvegas    | 项目源码 | 文件源码
def create_socket(self):
        family = socket.AF_INET6 if config.connection.ipv6 else socket.AF_INET
        if config.connection.proxy:
            proxy_server, proxy_port = config.connection.proxy.split(':')
            self.sock = socks.socksocket(family, socket.SOCK_STREAM)
            self.sock.setblocking(0)
            self.sock.settimeout(15)
            self.sock.setproxy(socks.PROXY_TYPE_SOCKS5, proxy_server, int(proxy_port))
        else:
            self.sock = socket.socket(family, socket.SOCK_STREAM)
        if config.connection.vhost:
            self.sock.bind((config.connection.vhost, 0))
        if config.connection.ssl:
            ctx = ssl.create_default_context()
            if config.cert.file:
                ctx.load_cert_chain(config.cert.file, config.cert.key, config.cert.password)
            if config.connection.ssl_verify:
                ctx.verify_mode = ssl.CERT_REQUIRED
                ctx.load_default_certs()
            else:
                ctx.check_hostname = False
                ctx.verify_mode = ssl.CERT_NONE
            self.sock = ctx.wrap_socket(self.sock)
项目:xunfeng    作者:ysrc    | 项目源码 | 文件源码
def is_SOAP(ip, port, timeout):
    try:
        ctx = ssl.create_default_context()
        ctx.check_hostname = False
        ctx.verify_mode = ssl.CERT_NONE
        output = urllib2.urlopen('https://'+ip+":"+str(port), context=ctx, timeout=timeout).read()
        if "rO0AB" in output:return (1, True)
    except urllib2.HTTPError, e:
        if ((e.getcode() == 500) and ("rO0AB" in e.read())):return (1, True)
    except:pass
    try:
        output = urllib2.urlopen('http://'+ip+":"+str(port), timeout=timeout).read()
        if "rO0AB" in output:return (0, True)
    except urllib2.HTTPError, e:
        if ((e.getcode() == 500) and ("rO0AB" in e.read())):return (0, True)
    except:pass
    return (2, False)
项目:bawk    作者:jttwnsnd    | 项目源码 | 文件源码
def _create_ssl_ctx(self, sslp):
        if isinstance(sslp, ssl.SSLContext):
            return sslp
        ca = sslp.get('ca')
        capath = sslp.get('capath')
        hasnoca = ca is None and capath is None
        ctx = ssl.create_default_context(cafile=ca, capath=capath)
        ctx.check_hostname = not hasnoca and sslp.get('check_hostname', True)
        ctx.verify_mode = ssl.CERT_NONE if hasnoca else ssl.CERT_REQUIRED
        if 'cert' in sslp:
            ctx.load_cert_chain(sslp['cert'], keyfile=sslp.get('key'))
        if 'cipher' in sslp:
            ctx.set_ciphers(sslp['cipher'])
        ctx.options |= ssl.OP_NO_SSLv2
        ctx.options |= ssl.OP_NO_SSLv3
        return ctx
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def urlopen(url, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
            cafile=None, capath=None, cadefault=False, context=None):
    global _opener
    if cafile or capath or cadefault:
        if context is not None:
            raise ValueError(
                "You can't pass both context and any of cafile, capath, and "
                "cadefault"
            )
        if not _have_ssl:
            raise ValueError('SSL support not available')
        context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH,
                                             cafile=cafile,
                                             capath=capath)
        https_handler = HTTPSHandler(context=context)
        opener = build_opener(https_handler)
    elif context:
        https_handler = HTTPSHandler(context=context)
        opener = build_opener(https_handler)
    elif _opener is None:
        _opener = opener = build_opener()
    else:
        opener = _opener
    return opener.open(url, data, timeout)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def make_https_server(case, context=None, certfile=CERTFILE,
                      host=HOST, handler_class=None):
    if context is None:
        context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
    # We assume the certfile contains both private key and certificate
    context.load_cert_chain(certfile)
    server = HTTPSServerThread(context, host, handler_class)
    flag = threading.Event()
    server.start(flag)
    flag.wait()
    def cleanup():
        if support.verbose:
            sys.stdout.write('stopping HTTPS server\n')
        server.stop()
        if support.verbose:
            sys.stdout.write('joining HTTPS thread\n')
        server.join()
    case.addCleanup(cleanup)
    return server
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def urlopen(url, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
            cafile=None, capath=None, cadefault=False, context=None):
    global _opener
    if cafile or capath or cadefault:
        if context is not None:
            raise ValueError(
                "You can't pass both context and any of cafile, capath, and "
                "cadefault"
            )
        if not _have_ssl:
            raise ValueError('SSL support not available')
        context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH,
                                             cafile=cafile,
                                             capath=capath)
        https_handler = HTTPSHandler(context=context)
        opener = build_opener(https_handler)
    elif context:
        https_handler = HTTPSHandler(context=context)
        opener = build_opener(https_handler)
    elif _opener is None:
        _opener = opener = build_opener()
    else:
        opener = _opener
    return opener.open(url, data, timeout)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def make_https_server(case, context=None, certfile=CERTFILE,
                      host=HOST, handler_class=None):
    if context is None:
        context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
    # We assume the certfile contains both private key and certificate
    context.load_cert_chain(certfile)
    server = HTTPSServerThread(context, host, handler_class)
    flag = threading.Event()
    server.start(flag)
    flag.wait()
    def cleanup():
        if support.verbose:
            sys.stdout.write('stopping HTTPS server\n')
        server.stop()
        if support.verbose:
            sys.stdout.write('joining HTTPS thread\n')
        server.join()
    case.addCleanup(cleanup)
    return server
项目:kodi-plugin.video.ted-talks-chinese    作者:daineseh    | 项目源码 | 文件源码
def make_HTTPS_handler(params, **kwargs):
    opts_no_check_certificate = params.get('nocheckcertificate', False)
    if hasattr(ssl, 'create_default_context'):  # Python >= 3.4 or 2.7.9
        context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
        if opts_no_check_certificate:
            context.check_hostname = False
            context.verify_mode = ssl.CERT_NONE
        try:
            return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
        except TypeError:
            # Python 2.7.8
            # (create_default_context present but HTTPSHandler has no context=)
            pass

    if sys.version_info < (3, 2):
        return YoutubeDLHTTPSHandler(params, **kwargs)
    else:  # Python < 3.4
        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        context.verify_mode = (ssl.CERT_NONE
                               if opts_no_check_certificate
                               else ssl.CERT_REQUIRED)
        context.set_default_verify_paths()
        return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def urlopen(url, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
            cafile=None, capath=None, cadefault=False, context=None):
    global _opener
    if cafile or capath or cadefault:
        if context is not None:
            raise ValueError(
                "You can't pass both context and any of cafile, capath, and "
                "cadefault"
            )
        if not _have_ssl:
            raise ValueError('SSL support not available')
        context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH,
                                             cafile=cafile,
                                             capath=capath)
        https_handler = HTTPSHandler(context=context)
        opener = build_opener(https_handler)
    elif context:
        https_handler = HTTPSHandler(context=context)
        opener = build_opener(https_handler)
    elif _opener is None:
        _opener = opener = build_opener()
    else:
        opener = _opener
    return opener.open(url, data, timeout)
项目:almond-nnparser    作者:Stanford-Mobisocial-IoT-Lab    | 项目源码 | 文件源码
def init_from_url(self, snapshot=-1, thingpedia_url=None):
        if thingpedia_url is None:
            thingpedia_url = os.getenv('THINGPEDIA_URL', 'https://thingpedia.stanford.edu/thingpedia')
        ssl_context = ssl.create_default_context()

        with urllib.request.urlopen(thingpedia_url + '/api/snapshot/' + str(snapshot) + '?meta=1', context=ssl_context) as res:
            self._process_devices(json.load(res)['data'])

        with urllib.request.urlopen(thingpedia_url + '/api/entities?snapshot=' + str(snapshot), context=ssl_context) as res:
            self._process_entities(json.load(res)['data'])
项目:almond-nnparser    作者:Stanford-Mobisocial-IoT-Lab    | 项目源码 | 文件源码
def run():
    np.random.seed(42)
    config = ServerConfig.load(('./server.conf',))

    if sys.version_info[2] >= 6:
        thread_pool = ThreadPoolExecutor(thread_name_prefix='query-thread-')
    else:
        thread_pool = ThreadPoolExecutor(max_workers=32)
    app = Application(config, thread_pool)

    if config.ssl_key:
        ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
        ssl_ctx.load_cert_chain(config.ssl_chain, config.ssl_key)
        app.listen(config.port, ssl_options=ssl_ctx)
    else:
        app.listen(config.port)

    if config.user:
        os.setgid(grp.getgrnam(config.user)[2])
        os.setuid(pwd.getpwnam(config.user)[2])

    if sd:
        sd.notify('READY=1')

    tokenizer_service = TokenizerService()
    tokenizer_service.run()

    for language in config.languages:
        load_language(app, tokenizer_service, language, config.get_model_directory(language))

    sys.stdout.flush()
    tornado.ioloop.IOLoop.current().start()
项目:python-libjuju    作者:juju    | 项目源码 | 文件源码
def _get_ssl(self, cert=None):
        return ssl.create_default_context(
            purpose=ssl.Purpose.CLIENT_AUTH, cadata=cert)
项目:girder_worker    作者:girder    | 项目源码 | 文件源码
def __init__(self, output_spec):
        """
        Uses HTTP chunked transfer-encoding to stream a request body to a
        server. Unfortunately requests does not support hooking into this logic
        easily, so we use the lower-level httplib module.
        """
        super(HttpStreamPushAdapter, self).__init__(output_spec)
        self._closed = False

        parts = urlparse.urlparse(output_spec['url'])
        if parts.scheme == 'https':
            ssl_context = ssl.create_default_context()
            conn = httplib.HTTPSConnection(parts.netloc, context=ssl_context)
        else:
            conn = httplib.HTTPConnection(parts.netloc)

        try:
            conn.putrequest(output_spec.get('method', 'POST').upper(),
                            parts.path, skip_accept_encoding=True)

            for header, value in output_spec.get('headers', {}).items():
                conn.putheader(header, value)

            conn.putheader('Transfer-Encoding', 'chunked')
            conn.endheaders()  # This actually flushes the headers to the server
        except Exception:
            print('HTTP connection to "%s" failed.' % output_spec['url'])
            conn.close()
            raise

        self.conn = conn
项目:girder_worker    作者:girder    | 项目源码 | 文件源码
def __init__(self, url, headers={}):
        self._url = url

        """
        Uses HTTP chunked transfer-encoding to stream a request body to a
        server. Unfortunately requests does not support hooking into this logic
        easily, so we use the lower-level httplib module.
        """
        self._closed = False

        parts = urlparse.urlparse(self._url)
        if parts.scheme == 'https':
            ssl_context = ssl.create_default_context()
            conn = httplib.HTTPSConnection(parts.netloc, context=ssl_context)
        else:
            conn = httplib.HTTPConnection(parts.netloc)

        try:
            url = parts.path
            if parts.query is not None:
                url = '%s?%s' % (url, parts.query)
            conn.putrequest('POST',
                            url, skip_accept_encoding=True)

            for header, value in headers.items():
                conn.putheader(header, value)

            conn.putheader('Transfer-Encoding', 'chunked')

            conn.endheaders()  # This actually flushes the headers to the server
        except Exception:
            sys.stderr.write('HTTP connection to "%s" failed.\n' % self._url)
            conn.close()
            raise

        self.conn = conn
项目:socketshark    作者:closeio    | 项目源码 | 文件源码
def http_post(shark, url, data):
    log = shark.log.bind(url=url)
    opts = shark.config['HTTP']
    if opts.get('ssl_cafile'):
        ssl_context = ssl.create_default_context(cafile=opts['ssl_cafile'])
    else:
        ssl_context = None
    conn = aiohttp.TCPConnector(ssl_context=ssl_context)
    async with aiohttp.ClientSession(connector=conn) as session:
        wait = opts['wait']
        for n in range(opts['tries']):
            if n > 0:
                await asyncio.sleep(wait)
            try:
                log.debug('http request', data=data)
                async with session.post(url, json=data,
                                        timeout=opts['timeout']) as resp:
                    if resp.status == 429:  # Too many requests.
                        wait = _get_rate_limit_wait(log, resp, opts)
                        continue
                    else:
                        wait = opts['wait']
                    resp.raise_for_status()
                    data = await resp.json()
                    log.debug('http response', data=data)
                    return data
            except aiohttp.ClientError:
                log.exception('unhandled exception in http_post')
            except asyncio.TimeoutError:
                log.exception('timeout in http_post')
        return {'status': 'error', 'error': c.ERR_SERVICE_UNAVAILABLE}
项目:socketshark    作者:closeio    | 项目源码 | 文件源码
def get_ssl_context(self):
        ssl_settings = self.config.get('WS_SSL')
        if ssl_settings:
            ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
            ssl_context.load_cert_chain(certfile=ssl_settings['cert'],
                                        keyfile=ssl_settings['key'])
            return ssl_context
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_check_hostname(self):
        # Test that server_hostname parameter to start_tls is being used.
        # The check_hostname functionality is only available in python 2.7 and
        # up and in python 3.4 and up.
        server_future = self.server_start_tls(_server_ssl_options())
        client_future = self.client_start_tls(
            ssl.create_default_context(),
            server_hostname=b'127.0.0.1')
        with ExpectLog(gen_log, "SSL Error"):
            with self.assertRaises(ssl.SSLError):
                # The client fails to connect with an SSL error.
                yield client_future
        with self.assertRaises(Exception):
            # The server fails to connect, but the exact error is unspecified.
            yield server_future
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_check_hostname(self):
        # Test that server_hostname parameter to start_tls is being used.
        # The check_hostname functionality is only available in python 2.7 and
        # up and in python 3.4 and up.
        server_future = self.server_start_tls(_server_ssl_options())
        client_future = self.client_start_tls(
            ssl.create_default_context(),
            server_hostname=b'127.0.0.1')
        with ExpectLog(gen_log, "SSL Error"):
            with self.assertRaises(ssl.SSLError):
                # The client fails to connect with an SSL error.
                yield client_future
        with self.assertRaises(Exception):
            # The server fails to connect, but the exact error is unspecified.
            yield server_future
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_check_hostname(self):
        # Test that server_hostname parameter to start_tls is being used.
        # The check_hostname functionality is only available in python 2.7 and
        # up and in python 3.4 and up.
        server_future = self.server_start_tls(_server_ssl_options())
        client_future = self.client_start_tls(
            ssl.create_default_context(),
            server_hostname=b'127.0.0.1')
        with ExpectLog(gen_log, "SSL Error"):
            with self.assertRaises(ssl.SSLError):
                # The client fails to connect with an SSL error.
                yield client_future
        with self.assertRaises(Exception):
            # The server fails to connect, but the exact error is unspecified.
            yield server_future
项目:python-keylime    作者:mit-ll    | 项目源码 | 文件源码
def get_tls_context(self):
        ca_cert = config.get('tenant', 'ca_cert')
        my_cert = config.get('tenant', 'my_cert')
        my_priv_key = config.get('tenant', 'private_key')
        my_key_pw = config.get('tenant','private_key_pw')

        tls_dir = config.get('tenant','tls_dir')

        if tls_dir == 'default':
            ca_cert = 'cacert.crt'
            my_cert = 'client-cert.crt'
            my_priv_key = 'client-private.pem'
            tls_dir = 'cv_ca'

        # this is relative path, convert to absolute in WORK_DIR
        if tls_dir[0]!='/':
            tls_dir = os.path.abspath('%s/%s'%(common.WORK_DIR,tls_dir))

        if my_key_pw=='default':
            logger.warning("CAUTION: using default password for private key, please set private_key_pw to a strong password")

        logger.info("Setting up client TLS in %s"%(tls_dir))

        ca_path = "%s/%s"%(tls_dir,ca_cert)
        my_cert = "%s/%s"%(tls_dir,my_cert)
        my_priv_key = "%s/%s"%(tls_dir,my_priv_key)

        context = ssl.create_default_context()
        context.load_verify_locations(cafile=ca_path)   
        context.load_cert_chain(certfile=my_cert,keyfile=my_priv_key,password=my_key_pw)
        context.verify_mode = ssl.CERT_REQUIRED
        context.check_hostname = config.getboolean('general','tls_check_hostnames')
        return context
项目:Python4Pentesters    作者:tatanus    | 项目源码 | 文件源码
def test_dir(url, depth, max_depth, dir_list):
    if (depth > max_depth):
        return
    depth = depth +1

    for d in dir_list:
        new_url = url + "/" + d
        try:
            # make a SSL handler that ignores SSL CERT issues
            ctx = ssl.create_default_context()
            ctx.check_hostname = False
            ctx.verify_mode = ssl.CERT_NONE

            response = urllib2.urlopen(new_url, context=ctx)
            if response and response.getcode() == 200:
                print "[+] FOUND %s" % (new_url)
                thread.start_new_thread(test_dir, (new_url, depth, max_depth, dir_list, ))
        except urllib2.HTTPError, e:
            if e.code == 401:
                print "[!] Authorization Required %s " % (new_url)
            elif e.code == 403:
                print "[!] Forbidden %s " % (new_url)
            elif e.code == 404:
                print "[-] Not Found %s " % (new_url)
            elif e.code == 503:
                print "[!] Service Unavailable %s " % (new_url)
            else:
                print "[?] Unknwon"
项目:Python4Pentesters    作者:tatanus    | 项目源码 | 文件源码
def test_dir(url, depth, max_depth, dir_list):
    if (depth > max_depth):
        return
    depth = depth +1

    found = list()
    for d in dir_list:
        new_url = url + "/" + d
        try:
            # make a SSL handler that ignores SSL CERT issues
            ctx = ssl.create_default_context()
            ctx.check_hostname = False
            ctx.verify_mode = ssl.CERT_NONE

            response = urllib2.urlopen(new_url, context=ctx)
            if response and response.getcode() == 200:
                print "[+] FOUND %s" % (new_url)
                found.append(new_url)
        except urllib2.HTTPError, e:
            if e.code == 401:
                print "[!] Authorization Required %s " % (new_url)
            elif e.code == 403:
                print "[!] Forbidden %s " % (new_url)
            elif e.code == 404:
                print "[-] Not Found %s " % (new_url)
            elif e.code == 503:
                print "[!] Service Unavailable %s " % (new_url)
            else:
                print "[?] Unknwon"

    # loop over each identified valid directory and recurse into it
    for new_url in found:
        test_dir(new_url, depth, max_depth, dir_list)
项目:pymotw3    作者:reingart    | 项目源码 | 文件源码
def echo_client(server_address, messages):

    log = logging.getLogger('echo_client')

    # The certificate is created with pymotw.com as the hostname,
    # which will not match when the example code runs
    # elsewhere, so disable hostname verification.
    ssl_context = ssl.create_default_context(
        ssl.Purpose.SERVER_AUTH,
    )
    ssl_context.check_hostname = False
    ssl_context.load_verify_locations('pymotw.crt')

    log.debug('connecting to {} port {}'.format(*server_address))
    reader, writer = await asyncio.open_connection(
        *server_address, ssl=ssl_context)

    # This could be writer.writelines() except that
    # would make it harder to show each part of the message
    # being sent.
    for msg in messages:
        writer.write(msg)
        log.debug('sending {!r}'.format(msg))
    # SSL does not support EOF, so send a null byte to indicate
    # the end of the message.
    writer.write(b'\x00')
    await writer.drain()

    log.debug('waiting for response')
    while True:
        data = await reader.read(128)
        if data:
            log.debug('received {!r}'.format(data))
        else:
            log.debug('closing')
            writer.close()
            return
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def _test_create_ssl_connection(self, httpd, create_connection,
                                    check_sockname=True):
        conn_fut = create_connection(ssl=test_utils.dummy_ssl_context())
        self._basetest_create_ssl_connection(conn_fut, check_sockname)

        # ssl.Purpose was introduced in Python 3.4
        if hasattr(ssl, 'Purpose'):
            def _dummy_ssl_create_context(purpose=ssl.Purpose.SERVER_AUTH, *,
                                          cafile=None, capath=None,
                                          cadata=None):
                """
                A ssl.create_default_context() replacement that doesn't enable
                cert validation.
                """
                self.assertEqual(purpose, ssl.Purpose.SERVER_AUTH)
                return test_utils.dummy_ssl_context()

            # With ssl=True, ssl.create_default_context() should be called
            with mock.patch('ssl.create_default_context',
                            side_effect=_dummy_ssl_create_context) as m:
                conn_fut = create_connection(ssl=True)
                self._basetest_create_ssl_connection(conn_fut, check_sockname)
                self.assertEqual(m.call_count, 1)

        # With the real ssl.create_default_context(), certificate
        # validation will fail
        with self.assertRaises(ssl.SSLError) as cm:
            conn_fut = create_connection(ssl=True)
            # Ignore the "SSL handshake failed" log in debug mode
            with test_utils.disable_logger():
                self._basetest_create_ssl_connection(conn_fut, check_sockname)

        self.assertEqual(cm.exception.reason, 'CERTIFICATE_VERIFY_FAILED')