Python urllib3 模块,ProxyManager() 实例源码

我们从Python开源项目中,提取了以下10个代码示例,用于说明如何使用urllib3.ProxyManager()

项目:Java-Application-Exploits    作者:KBZ-Infosec    | 项目源码 | 文件源码
def configure_http_pool():

    global gl_http_pool

    if gl_args.mode == 'auto-scan' or gl_args.mode == 'file-scan':
        timeout = Timeout(connect=1.0, read=3.0)
    else:
        timeout = Timeout(connect=gl_args.timeout, read=6.0)

    if gl_args.proxy:
        # when using proxy, protocol should be informed
        if (gl_args.host is not None and 'http' not in gl_args.host) or 'http' not in gl_args.proxy:
            print_and_flush(RED + " * When using proxy, you must specify the http or https protocol"
                                  " (eg. http://%s).\n\n" %(gl_args.host if 'http' not in gl_args.host else gl_args.proxy) +ENDC)
            logging.critical('Protocol not specified')
            exit(1)

        try:
            if gl_args.proxy_cred:
                headers = make_headers(proxy_basic_auth=gl_args.proxy_cred)
                gl_http_pool = ProxyManager(proxy_url=gl_args.proxy, proxy_headers=headers, timeout=timeout, cert_reqs='CERT_NONE')
            else:
                gl_http_pool = ProxyManager(proxy_url=gl_args.proxy, timeout=timeout, cert_reqs='CERT_NONE')
        except:
            print_and_flush(RED + " * An error occurred while setting the proxy. Please see log for details..\n\n" +ENDC)
            logging.critical('Error while setting the proxy', exc_info=traceback)
            exit(1)
    else:
        gl_http_pool = PoolManager(timeout=timeout, cert_reqs='CERT_NONE')
项目:apm-agent-python    作者:elastic    | 项目源码 | 文件源码
def __init__(self, parsed_url, **kwargs):
        super(Transport, self).__init__(parsed_url, **kwargs)
        pool_kwargs = {
            'cert_reqs': 'CERT_REQUIRED',
            'ca_certs': certifi.where(),
            'block': True,
        }
        if not self._verify_server_cert:
            pool_kwargs['cert_reqs'] = ssl.CERT_NONE
            pool_kwargs['assert_hostname'] = False
        proxy_url = os.environ.get('HTTPS_PROXY', os.environ.get('HTTP_PROXY'))
        if proxy_url:
            self.http = urllib3.ProxyManager(proxy_url, **pool_kwargs)
        else:
            self.http = urllib3.PoolManager(**pool_kwargs)
项目:OpenDoor    作者:stanislav-web    | 项目源码 | 文件源码
def __proxy_pool(self):
        """
        Create Proxy connection pool
        :raise ProxyRequestError
        :return: urllib3.HTTPConnectionPool
        """

        try:

            self.__server = self.__cfg.proxy if True is self.__cfg.is_standalone_proxy else self.__get_random_proxy()

            if self.__get_proxy_type(self.__server) == 'socks':

                disable_warnings(InsecureRequestWarning)

                if not hasattr(self, '__pm'):

                    package_module = importlib.import_module('urllib3.contrib.socks')
                    self.__pm = getattr(package_module, 'SOCKSProxyManager')

                pool = self.__pm(self.__server,
                                 num_pools=self.__cfg.threads,
                                 timeout=Timeout(self.__cfg.timeout,
                                 read=self.__cfg.timeout),
                                 block=True)
            else:
                pool = ProxyManager(self.__server,
                                    num_pools=self.__cfg.threads,
                                    timeout=Timeout(self.__cfg.timeout, read=self.__cfg.timeout),
                                    block=True)
            return pool
        except (DependencyWarning, ProxySchemeUnknown, ImportError) as error:
            raise ProxyRequestError(error)
项目:httplib2shim    作者:GoogleCloudPlatform    | 项目源码 | 文件源码
def test_instance_and_callable(self):
        proxy_info = httplib2.proxy_info_from_url('http://myproxy.example.com')
        http = httplib2.Http(proxy_info=proxy_info)
        self.assertIsInstance(http.pool, urllib3.ProxyManager)
        http = httplib2.Http(proxy_info=lambda: proxy_info)
        self.assertIsInstance(http.pool, urllib3.ProxyManager)
项目:httplib2shim    作者:GoogleCloudPlatform    | 项目源码 | 文件源码
def _default_make_pool(http, proxy_info):
    """Creates a urllib3.PoolManager object that has SSL verification enabled
    and uses the certifi certificates."""

    if not http.ca_certs:
        http.ca_certs = _certifi_where_for_ssl_version()

    ssl_disabled = http.disable_ssl_certificate_validation

    cert_reqs = 'CERT_REQUIRED' if http.ca_certs and not ssl_disabled else None

    if isinstance(proxy_info, collections.Callable):
        proxy_info = proxy_info()
    if proxy_info:
        if proxy_info.proxy_user and proxy_info.proxy_pass:
            proxy_url = 'http://{}:{}@{}:{}/'.format(
                proxy_info.proxy_user, proxy_info.proxy_pass,
                proxy_info.proxy_host, proxy_info.proxy_port,
            )
            proxy_headers = urllib3.util.request.make_headers(
                proxy_basic_auth='{}:{}'.format(
                    proxy_info.proxy_user, proxy_info.proxy_pass,
                )
            )
        else:
            proxy_url = 'http://{}:{}/'.format(
                proxy_info.proxy_host, proxy_info.proxy_port,
            )
            proxy_headers = {}

        return urllib3.ProxyManager(
            proxy_url=proxy_url,
            proxy_headers=proxy_headers,
            ca_certs=http.ca_certs,
            cert_reqs=cert_reqs,
        )
    return urllib3.PoolManager(
        ca_certs=http.ca_certs,
        cert_reqs=cert_reqs,
    )
项目:client-python    作者:kubernetes-incubator    | 项目源码 | 文件源码
def __init__(self, configuration, pools_size=4, maxsize=None):
        # urllib3.PoolManager will pass all kw parameters to connectionpool
        # https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/poolmanager.py#L75
        # https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/connectionpool.py#L680
        # maxsize is the number of requests to host that are allowed in parallel
        # Custom SSL certificates and client certificates: http://urllib3.readthedocs.io/en/latest/advanced-usage.html

        # cert_reqs
        if configuration.verify_ssl:
            cert_reqs = ssl.CERT_REQUIRED
        else:
            cert_reqs = ssl.CERT_NONE

        # ca_certs
        if configuration.ssl_ca_cert:
            ca_certs = configuration.ssl_ca_cert
        else:
            # if not set certificate file, use Mozilla's root certificates.
            ca_certs = certifi.where()

        addition_pool_args = {}
        if configuration.assert_hostname is not None:
            addition_pool_args['assert_hostname'] = configuration.assert_hostname

        if maxsize is None:
            if configuration.connection_pool_maxsize is not None:
                maxsize = configuration.connection_pool_maxsize
            else:
                maxsize = 4

        # https pool manager
        if configuration.proxy:
            self.pool_manager = urllib3.ProxyManager(
                num_pools=pools_size,
                maxsize=maxsize,
                cert_reqs=cert_reqs,
                ca_certs=ca_certs,
                cert_file=configuration.cert_file,
                key_file=configuration.key_file,
                proxy_url=configuration.proxy,
                **addition_pool_args
            )
        else:
            self.pool_manager = urllib3.PoolManager(
                num_pools=pools_size,
                maxsize=maxsize,
                cert_reqs=cert_reqs,
                ca_certs=ca_certs,
                cert_file=configuration.cert_file,
                key_file=configuration.key_file,
                **addition_pool_args
            )
项目:feersum-nlu-api-wrappers    作者:praekelt    | 项目源码 | 文件源码
def __init__(self, configuration, pools_size=4, maxsize=None):
        # urllib3.PoolManager will pass all kw parameters to connectionpool
        # https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/poolmanager.py#L75  # noqa: E501
        # https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/connectionpool.py#L680  # noqa: E501
        # maxsize is the number of requests to host that are allowed in parallel  # noqa: E501
        # Custom SSL certificates and client certificates: http://urllib3.readthedocs.io/en/latest/advanced-usage.html  # noqa: E501

        # cert_reqs
        if configuration.verify_ssl:
            cert_reqs = ssl.CERT_REQUIRED
        else:
            cert_reqs = ssl.CERT_NONE

        # ca_certs
        if configuration.ssl_ca_cert:
            ca_certs = configuration.ssl_ca_cert
        else:
            # if not set certificate file, use Mozilla's root certificates.
            ca_certs = certifi.where()

        addition_pool_args = {}
        if configuration.assert_hostname is not None:
            addition_pool_args['assert_hostname'] = configuration.assert_hostname  # noqa: E501

        if maxsize is None:
            if configuration.connection_pool_maxsize is not None:
                maxsize = configuration.connection_pool_maxsize
            else:
                maxsize = 4

        # https pool manager
        if configuration.proxy:
            self.pool_manager = urllib3.ProxyManager(
                num_pools=pools_size,
                maxsize=maxsize,
                cert_reqs=cert_reqs,
                ca_certs=ca_certs,
                cert_file=configuration.cert_file,
                key_file=configuration.key_file,
                proxy_url=configuration.proxy,
                **addition_pool_args
            )
        else:
            self.pool_manager = urllib3.PoolManager(
                num_pools=pools_size,
                maxsize=maxsize,
                cert_reqs=cert_reqs,
                ca_certs=ca_certs,
                cert_file=configuration.cert_file,
                key_file=configuration.key_file,
                **addition_pool_args
            )
项目:kubeResource    作者:jonathan-kosgei    | 项目源码 | 文件源码
def __init__(self, pools_size=4, maxsize=4):
        # urllib3.PoolManager will pass all kw parameters to connectionpool
        # https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/poolmanager.py#L75
        # https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/connectionpool.py#L680
        # maxsize is the number of requests to host that are allowed in parallel
        # ca_certs vs cert_file vs key_file
        # http://stackoverflow.com/a/23957365/2985775

        # cert_reqs
        if Configuration().verify_ssl:
            cert_reqs = ssl.CERT_REQUIRED
        else:
            cert_reqs = ssl.CERT_NONE

        # ca_certs
        if Configuration().ssl_ca_cert:
            ca_certs = Configuration().ssl_ca_cert
        else:
            # if not set certificate file, use Mozilla's root certificates.
            ca_certs = certifi.where()

        # cert_file
        cert_file = Configuration().cert_file

        # key file
        key_file = Configuration().key_file

        # proxy
        proxy = Configuration().proxy

        # https pool manager
        if proxy:
            self.pool_manager = urllib3.ProxyManager(
                num_pools=pools_size,
                maxsize=maxsize,
                cert_reqs=cert_reqs,
                ca_certs=ca_certs,
                cert_file=cert_file,
                key_file=key_file,
                proxy_url=proxy
            )
        else:
            self.pool_manager = urllib3.PoolManager(
                num_pools=pools_size,
                maxsize=maxsize,
                cert_reqs=cert_reqs,
                ca_certs=ca_certs,
                cert_file=cert_file,
                key_file=key_file
            )
项目:data.world-py    作者:datadotworld    | 项目源码 | 文件源码
def __init__(self, pools_size=4, maxsize=4):
        # urllib3.PoolManager will pass all kw parameters to connectionpool
        # https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/poolmanager.py#L75
        # https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/connectionpool.py#L680
        # maxsize is the number of requests to host that are allowed in parallel
        # ca_certs vs cert_file vs key_file
        # http://stackoverflow.com/a/23957365/2985775

        # cert_reqs
        if Configuration().verify_ssl:
            cert_reqs = ssl.CERT_REQUIRED
        else:
            cert_reqs = ssl.CERT_NONE

        # ca_certs
        if Configuration().ssl_ca_cert:
            ca_certs = Configuration().ssl_ca_cert
        else:
            # if not set certificate file, use Mozilla's root certificates.
            ca_certs = certifi.where()

        # cert_file
        cert_file = Configuration().cert_file

        # key file
        key_file = Configuration().key_file

        # proxy
        proxy = Configuration().proxy

        # https pool manager
        if proxy:
            self.pool_manager = urllib3.ProxyManager(
                num_pools=pools_size,
                maxsize=maxsize,
                cert_reqs=cert_reqs,
                ca_certs=ca_certs,
                cert_file=cert_file,
                key_file=key_file,
                proxy_url=proxy
            )
        else:
            self.pool_manager = urllib3.PoolManager(
                num_pools=pools_size,
                maxsize=maxsize,
                cert_reqs=cert_reqs,
                ca_certs=ca_certs,
                cert_file=cert_file,
                key_file=key_file
            )
项目:service-act    作者:nmdp-bioinformatics    | 项目源码 | 文件源码
def __init__(self, pools_size=4, maxsize=4):
        # urllib3.PoolManager will pass all kw parameters to connectionpool
        # https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/poolmanager.py#L75
        # https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/connectionpool.py#L680
        # maxsize is the number of requests to host that are allowed in parallel
        # ca_certs vs cert_file vs key_file
        # http://stackoverflow.com/a/23957365/2985775

        # cert_reqs
        if Configuration().verify_ssl:
            cert_reqs = ssl.CERT_REQUIRED
        else:
            cert_reqs = ssl.CERT_NONE

        # ca_certs
        if Configuration().ssl_ca_cert:
            ca_certs = Configuration().ssl_ca_cert
        else:
            # if not set certificate file, use Mozilla's root certificates.
            ca_certs = certifi.where()

        # cert_file
        cert_file = Configuration().cert_file

        # key file
        key_file = Configuration().key_file

        # proxy
        proxy = Configuration().proxy

        # https pool manager
        if proxy:
            self.pool_manager = urllib3.ProxyManager(
                num_pools=pools_size,
                maxsize=maxsize,
                cert_reqs=cert_reqs,
                ca_certs=ca_certs,
                cert_file=cert_file,
                key_file=key_file,
                proxy_url=proxy
            )
        else:
            self.pool_manager = urllib3.PoolManager(
                num_pools=pools_size,
                maxsize=maxsize,
                cert_reqs=cert_reqs,
                ca_certs=ca_certs,
                cert_file=cert_file,
                key_file=key_file
            )