我们从Python开源项目中,提取了以下8个代码示例,用于说明如何使用urllib3.HTTPConnectionPool()。
def __http_pool(self): """ Create HTTP connection pool :raise HttpRequestError :return: urllib3.HTTPConnectionPool """ try: pool = HTTPConnectionPool(self.__cfg.host, port=self.__cfg.port, maxsize=self.__cfg.threads, timeout=Timeout(self.__cfg.timeout, read=self.__cfg.timeout), block=True) if self._HTTP_DBG_LEVEL <= self.__debug.level: self.__debug.debug_connection_pool('http_pool_start', pool) return pool except Exception as error: raise HttpRequestError(str(error))
def get_etcd_connection(self): with self._etcd_url_lock: port = self._etcd_url_parts.port or 2379 if self._etcd_url_parts.scheme == "https": _log.debug("Getting new HTTPS connection to %s:%s", self._etcd_url_parts.hostname, port) pool = HTTPSConnectionPool(self._etcd_url_parts.hostname, port, key_file=self._etcd_key_file, cert_file=self._etcd_cert_file, ca_certs=self._etcd_ca_file, maxsize=1) else: _log.debug("Getting new HTTP connection to %s:%s", self._etcd_url_parts.hostname, port) pool = HTTPConnectionPool(self._etcd_url_parts.hostname, port, maxsize=1) return pool
def test_issue_etcd_request_basic_get(self): # Initialise the etcd URL. self.driver._handle_init({ MSG_KEY_ETCD_URLS: ["http://localhost:4001/"], MSG_KEY_HOSTNAME: "ourhost", MSG_KEY_KEY_FILE: None, MSG_KEY_CERT_FILE: None, MSG_KEY_CA_FILE: None }) m_pool = Mock(spec=HTTPConnectionPool) self.driver._issue_etcd_request(m_pool, "calico/v1/Ready") self.assertEqual( m_pool.request.mock_calls, [call("GET", "http://localhost:4001/v2/keys/calico/v1/Ready", fields=None, timeout=5, preload_content=True)] )
def test_issue_etcd_request_recursive_watch(self): # Initialise the etcd URL. self.driver._handle_init({ MSG_KEY_ETCD_URLS: ["http://localhost:4001/"], MSG_KEY_HOSTNAME: "ourhost", MSG_KEY_KEY_FILE: None, MSG_KEY_CERT_FILE: None, MSG_KEY_CA_FILE: None }) m_pool = Mock(spec=HTTPConnectionPool) self.driver._issue_etcd_request(m_pool, "calico/v1", timeout=10, wait_index=11, recursive=True) self.assertEqual( m_pool.request.mock_calls, [call("GET", "http://localhost:4001/v2/keys/calico/v1", fields={"recursive": "true", "wait": "true", "waitIndex": 11}, timeout=10, preload_content=False)] )
def __init__(self, host='localhost', port=9200, http_auth=None, use_ssl=False, verify_certs=False, ca_certs=None, client_cert=None, ssl_version=None, ssl_assert_hostname=None, ssl_assert_fingerprint=None, maxsize=10, **kwargs): super(Urllib3HttpConnection, self).__init__(host=host, port=port, **kwargs) self.headers = urllib3.make_headers(keep_alive=True) if http_auth is not None: if isinstance(http_auth, (tuple, list)): http_auth = ':'.join(http_auth) self.headers.update(urllib3.make_headers(basic_auth=http_auth)) pool_class = urllib3.HTTPConnectionPool kw = {} if use_ssl: pool_class = urllib3.HTTPSConnectionPool kw.update({ 'ssl_version': ssl_version, 'assert_hostname': ssl_assert_hostname, 'assert_fingerprint': ssl_assert_fingerprint, }) if verify_certs: kw.update({ 'cert_reqs': 'CERT_REQUIRED', 'ca_certs': ca_certs, 'cert_file': client_cert, }) elif ca_certs: raise ImproperlyConfigured("You cannot pass CA certificates when verify SSL is off.") else: warnings.warn( 'Connecting to %s using SSL with verify_certs=False is insecure.' % host) self.pool = pool_class(host, port=port, timeout=self.timeout, maxsize=maxsize, **kw)
def _get_connection_pool(): if DEBUG: return urllib3.HTTPConnectionPool('127.0.0.1:8000') else: return urllib3.HTTPSConnectionPool( 'api.nivad.io', cert_reqs='CERT_REQUIRED', ca_certs=certifi.where() )
def get(self, host, params=()): # type: (object, object) -> object """Get metadata by url""" self.__is_server_online(host) self.__disable_verbose() self.__parse_params(params) scheme, host = urlparse(host).scheme, urlparse(host).netloc self.DEFAULT_HTTP_PROTOCOL = scheme + "://" self.urls = self.__get_urls(host) response = {} self.HEADER['user-agent'] = self.reader.get_random_user_agent() log.info("user-agent : " + self.HEADER['user-agent']) log.info('Thread num : ' + str(self.threads)) try: httplib.HTTPConnection.debuglevel = self.debug if hasattr(urllib3, 'disable_warnings'): urllib3.disable_warnings() if scheme == "http": self.http = urllib3.HTTPConnectionPool(host.split(':')[0], port=80 if len(host.split(':')) == 1 else int( host.split(':')[1]), block=True, maxsize=10) elif scheme == "https": self.http = urllib3.HTTPSConnectionPool(host.split(':')[0], port=443 if len(host.split(':')) == 1 else int( host.split(':')[1]), block=True, maxsize=10) else: log.critical("not support http protocl, Exit now ") sys.exit(1); pool = threadpool.ThreadPool(self.threads) requests = threadpool.makeRequests(self.request, self.urls) for req in requests: pool.putRequest(req) time.sleep(1) pool.wait() except exceptions.AttributeError as e: log.critical(e.message) except KeyboardInterrupt: log.warning('Session canceled') sys.exit() self.counter['total'] = self.urls.__len__() self.counter['pools'] = pool.workers.__len__() response['count'] = self.counter response['result'] = self.result return response