我们从Python开源项目中,提取了以下10个代码示例,用于说明如何使用urllib3.ProxyManager()。
def configure_http_pool(): global gl_http_pool if gl_args.mode == 'auto-scan' or gl_args.mode == 'file-scan': timeout = Timeout(connect=1.0, read=3.0) else: timeout = Timeout(connect=gl_args.timeout, read=6.0) if gl_args.proxy: # when using proxy, protocol should be informed if (gl_args.host is not None and 'http' not in gl_args.host) or 'http' not in gl_args.proxy: print_and_flush(RED + " * When using proxy, you must specify the http or https protocol" " (eg. http://%s).\n\n" %(gl_args.host if 'http' not in gl_args.host else gl_args.proxy) +ENDC) logging.critical('Protocol not specified') exit(1) try: if gl_args.proxy_cred: headers = make_headers(proxy_basic_auth=gl_args.proxy_cred) gl_http_pool = ProxyManager(proxy_url=gl_args.proxy, proxy_headers=headers, timeout=timeout, cert_reqs='CERT_NONE') else: gl_http_pool = ProxyManager(proxy_url=gl_args.proxy, timeout=timeout, cert_reqs='CERT_NONE') except: print_and_flush(RED + " * An error occurred while setting the proxy. Please see log for details..\n\n" +ENDC) logging.critical('Error while setting the proxy', exc_info=traceback) exit(1) else: gl_http_pool = PoolManager(timeout=timeout, cert_reqs='CERT_NONE')
def __init__(self, parsed_url, **kwargs): super(Transport, self).__init__(parsed_url, **kwargs) pool_kwargs = { 'cert_reqs': 'CERT_REQUIRED', 'ca_certs': certifi.where(), 'block': True, } if not self._verify_server_cert: pool_kwargs['cert_reqs'] = ssl.CERT_NONE pool_kwargs['assert_hostname'] = False proxy_url = os.environ.get('HTTPS_PROXY', os.environ.get('HTTP_PROXY')) if proxy_url: self.http = urllib3.ProxyManager(proxy_url, **pool_kwargs) else: self.http = urllib3.PoolManager(**pool_kwargs)
def __proxy_pool(self): """ Create Proxy connection pool :raise ProxyRequestError :return: urllib3.HTTPConnectionPool """ try: self.__server = self.__cfg.proxy if True is self.__cfg.is_standalone_proxy else self.__get_random_proxy() if self.__get_proxy_type(self.__server) == 'socks': disable_warnings(InsecureRequestWarning) if not hasattr(self, '__pm'): package_module = importlib.import_module('urllib3.contrib.socks') self.__pm = getattr(package_module, 'SOCKSProxyManager') pool = self.__pm(self.__server, num_pools=self.__cfg.threads, timeout=Timeout(self.__cfg.timeout, read=self.__cfg.timeout), block=True) else: pool = ProxyManager(self.__server, num_pools=self.__cfg.threads, timeout=Timeout(self.__cfg.timeout, read=self.__cfg.timeout), block=True) return pool except (DependencyWarning, ProxySchemeUnknown, ImportError) as error: raise ProxyRequestError(error)
def test_instance_and_callable(self): proxy_info = httplib2.proxy_info_from_url('http://myproxy.example.com') http = httplib2.Http(proxy_info=proxy_info) self.assertIsInstance(http.pool, urllib3.ProxyManager) http = httplib2.Http(proxy_info=lambda: proxy_info) self.assertIsInstance(http.pool, urllib3.ProxyManager)
def _default_make_pool(http, proxy_info): """Creates a urllib3.PoolManager object that has SSL verification enabled and uses the certifi certificates.""" if not http.ca_certs: http.ca_certs = _certifi_where_for_ssl_version() ssl_disabled = http.disable_ssl_certificate_validation cert_reqs = 'CERT_REQUIRED' if http.ca_certs and not ssl_disabled else None if isinstance(proxy_info, collections.Callable): proxy_info = proxy_info() if proxy_info: if proxy_info.proxy_user and proxy_info.proxy_pass: proxy_url = 'http://{}:{}@{}:{}/'.format( proxy_info.proxy_user, proxy_info.proxy_pass, proxy_info.proxy_host, proxy_info.proxy_port, ) proxy_headers = urllib3.util.request.make_headers( proxy_basic_auth='{}:{}'.format( proxy_info.proxy_user, proxy_info.proxy_pass, ) ) else: proxy_url = 'http://{}:{}/'.format( proxy_info.proxy_host, proxy_info.proxy_port, ) proxy_headers = {} return urllib3.ProxyManager( proxy_url=proxy_url, proxy_headers=proxy_headers, ca_certs=http.ca_certs, cert_reqs=cert_reqs, ) return urllib3.PoolManager( ca_certs=http.ca_certs, cert_reqs=cert_reqs, )
def __init__(self, configuration, pools_size=4, maxsize=None): # urllib3.PoolManager will pass all kw parameters to connectionpool # https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/poolmanager.py#L75 # https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/connectionpool.py#L680 # maxsize is the number of requests to host that are allowed in parallel # Custom SSL certificates and client certificates: http://urllib3.readthedocs.io/en/latest/advanced-usage.html # cert_reqs if configuration.verify_ssl: cert_reqs = ssl.CERT_REQUIRED else: cert_reqs = ssl.CERT_NONE # ca_certs if configuration.ssl_ca_cert: ca_certs = configuration.ssl_ca_cert else: # if not set certificate file, use Mozilla's root certificates. ca_certs = certifi.where() addition_pool_args = {} if configuration.assert_hostname is not None: addition_pool_args['assert_hostname'] = configuration.assert_hostname if maxsize is None: if configuration.connection_pool_maxsize is not None: maxsize = configuration.connection_pool_maxsize else: maxsize = 4 # https pool manager if configuration.proxy: self.pool_manager = urllib3.ProxyManager( num_pools=pools_size, maxsize=maxsize, cert_reqs=cert_reqs, ca_certs=ca_certs, cert_file=configuration.cert_file, key_file=configuration.key_file, proxy_url=configuration.proxy, **addition_pool_args ) else: self.pool_manager = urllib3.PoolManager( num_pools=pools_size, maxsize=maxsize, cert_reqs=cert_reqs, ca_certs=ca_certs, cert_file=configuration.cert_file, key_file=configuration.key_file, **addition_pool_args )
def __init__(self, configuration, pools_size=4, maxsize=None): # urllib3.PoolManager will pass all kw parameters to connectionpool # https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/poolmanager.py#L75 # noqa: E501 # https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/connectionpool.py#L680 # noqa: E501 # maxsize is the number of requests to host that are allowed in parallel # noqa: E501 # Custom SSL certificates and client certificates: http://urllib3.readthedocs.io/en/latest/advanced-usage.html # noqa: E501 # cert_reqs if configuration.verify_ssl: cert_reqs = ssl.CERT_REQUIRED else: cert_reqs = ssl.CERT_NONE # ca_certs if configuration.ssl_ca_cert: ca_certs = configuration.ssl_ca_cert else: # if not set certificate file, use Mozilla's root certificates. ca_certs = certifi.where() addition_pool_args = {} if configuration.assert_hostname is not None: addition_pool_args['assert_hostname'] = configuration.assert_hostname # noqa: E501 if maxsize is None: if configuration.connection_pool_maxsize is not None: maxsize = configuration.connection_pool_maxsize else: maxsize = 4 # https pool manager if configuration.proxy: self.pool_manager = urllib3.ProxyManager( num_pools=pools_size, maxsize=maxsize, cert_reqs=cert_reqs, ca_certs=ca_certs, cert_file=configuration.cert_file, key_file=configuration.key_file, proxy_url=configuration.proxy, **addition_pool_args ) else: self.pool_manager = urllib3.PoolManager( num_pools=pools_size, maxsize=maxsize, cert_reqs=cert_reqs, ca_certs=ca_certs, cert_file=configuration.cert_file, key_file=configuration.key_file, **addition_pool_args )
def __init__(self, pools_size=4, maxsize=4): # urllib3.PoolManager will pass all kw parameters to connectionpool # https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/poolmanager.py#L75 # https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/connectionpool.py#L680 # maxsize is the number of requests to host that are allowed in parallel # ca_certs vs cert_file vs key_file # http://stackoverflow.com/a/23957365/2985775 # cert_reqs if Configuration().verify_ssl: cert_reqs = ssl.CERT_REQUIRED else: cert_reqs = ssl.CERT_NONE # ca_certs if Configuration().ssl_ca_cert: ca_certs = Configuration().ssl_ca_cert else: # if not set certificate file, use Mozilla's root certificates. ca_certs = certifi.where() # cert_file cert_file = Configuration().cert_file # key file key_file = Configuration().key_file # proxy proxy = Configuration().proxy # https pool manager if proxy: self.pool_manager = urllib3.ProxyManager( num_pools=pools_size, maxsize=maxsize, cert_reqs=cert_reqs, ca_certs=ca_certs, cert_file=cert_file, key_file=key_file, proxy_url=proxy ) else: self.pool_manager = urllib3.PoolManager( num_pools=pools_size, maxsize=maxsize, cert_reqs=cert_reqs, ca_certs=ca_certs, cert_file=cert_file, key_file=key_file )