我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用requests.adapters()。
def __init__(self, credentials, refresh_status_codes=transport.DEFAULT_REFRESH_STATUS_CODES, max_refresh_attempts=transport.DEFAULT_MAX_REFRESH_ATTEMPTS, refresh_timeout=None, **kwargs): super(AuthorizedSession, self).__init__(**kwargs) self.credentials = credentials self._refresh_status_codes = refresh_status_codes self._max_refresh_attempts = max_refresh_attempts self._refresh_timeout = refresh_timeout auth_request_session = requests.Session() # Using an adapter to make HTTP requests robust to network errors. # This adapter retrys HTTP requests when network errors occur # and the requests seems safely retryable. retry_adapter = requests.adapters.HTTPAdapter(max_retries=3) auth_request_session.mount("https://", retry_adapter) # Request instance used by internal methods (for example, # credentials.refresh). # Do not pass `self` as the session here, as it can lead to infinite # recursion. self._auth_request = Request(auth_request_session)
def init_poolmanager(self, connections, maxsize, block=DEFAULT_POOLBLOCK): """Initializes a urllib3 PoolManager. This method should not be called from user code, and is only exposed for use when subclassing the :class:`HTTPAdapter <requests.adapters.HTTPAdapter>`. :param connections: The number of urllib3 connection pools to cache. :param maxsize: The maximum number of connections to save in the pool. :param block: Block when no free connections are available. """ # save these values for pickling self._pool_connections = connections self._pool_maxsize = maxsize self._pool_block = block self.poolmanager = AsyncPoolManager(num_pools=connections, maxsize=maxsize, block=block) self.connections = []
def get_connection(self, url, proxies=None): """Returns a urllib3 connection for the given URL. This should not be called from user code, and is only exposed for use when subclassing the :class:`HTTPAdapter <requests.adapters.HTTPAdapter>`. :param url: The URL to connect to. :param proxies: (optional) A Requests-style dictionary of proxies used on this request. """ proxies = proxies or {} proxy = proxies.get(urlparse(url.lower()).scheme) if proxy: proxy_headers = self.proxy_headers(proxy) if proxy not in self.proxy_manager: self.proxy_manager[proxy] = proxy_from_url( proxy, proxy_headers=proxy_headers, num_pools=self._pool_connections, maxsize=self._pool_maxsize, block=self._pool_block ) conn = self.proxy_manager[proxy].connection_from_url(url) else: # Only scheme should be lower case parsed = urlparse(url) url = parsed.geturl() conn = self.poolmanager.connection_from_url(url) self.connections.append(conn) return conn
def __init__(self, endpoint=None, application_key=None, application_secret=None, consumer_key=None, timeout=TIMEOUT): from requests import Session from requests.adapters import HTTPAdapter self._endpoint = ENDPOINTS[endpoint] self._application_key = application_key self._application_secret = application_secret self._consumer_key = consumer_key # lazy load time delta self._time_delta = None try: # Some older versions of requests to not have the urllib3 # vendorized package from requests.packages.urllib3.util.retry import Retry except ImportError: retries = 5 else: # use a requests session to reuse connections between requests retries = Retry( total=5, backoff_factor=0.2, status_forcelist=[422, 500, 502, 503, 504] ) self._session = Session() self._session.mount('https://', HTTPAdapter(max_retries=retries)) self._session.mount('http://', HTTPAdapter(max_retries=retries)) # Override default timeout self._timeout = timeout
def requests_retry_session( retries=3, backoff_factor=0.3, status_forcelist=(500, 502, 504), ): """Opinionated wrapper that creates a requests session with a HTTPAdapter that sets up a Retry policy that includes connection retries. If you do the more naive retry by simply setting a number. E.g.:: adapter = HTTPAdapter(max_retries=3) then it will raise immediately on any connection errors. Retrying on connection errors guards better on unpredictable networks. From http://docs.python-requests.org/en/master/api/?highlight=retries#requests.adapters.HTTPAdapter it says: "By default, Requests does not retry failed connections." The backoff_factor is documented here: https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#urllib3.util.retry.Retry A default of retries=3 and backoff_factor=0.3 means it will sleep like:: [0.3, 0.6, 1.2] """ # noqa session = requests.Session() retry = Retry( total=retries, read=retries, connect=retries, backoff_factor=backoff_factor, status_forcelist=status_forcelist, ) adapter = HTTPAdapter(max_retries=retry) session.mount('http://', adapter) session.mount('https://', adapter) return session
def __init__(self): spider.Fetcher.__init__(self, max_repeat=3, sleep_time=0) self.session = requests.Session() self.session.mount('https://', requests.adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100)) self.clear_session() return
def test_transport_adapter_ordering(self): s = requests.Session() order = ['https://', 'http://'] assert order == list(s.adapters) s.mount('http://git', HTTPAdapter()) s.mount('http://github', HTTPAdapter()) s.mount('http://github.com', HTTPAdapter()) s.mount('http://github.com/about/', HTTPAdapter()) order = [ 'http://github.com/about/', 'http://github.com', 'http://github', 'http://git', 'https://', 'http://', ] assert order == list(s.adapters) s.mount('http://gittip', HTTPAdapter()) s.mount('http://gittip.com', HTTPAdapter()) s.mount('http://gittip.com/about/', HTTPAdapter()) order = [ 'http://github.com/about/', 'http://gittip.com/about/', 'http://github.com', 'http://gittip.com', 'http://github', 'http://gittip', 'http://git', 'https://', 'http://', ] assert order == list(s.adapters) s2 = requests.Session() s2.adapters = {'http://': HTTPAdapter()} s2.mount('https://', HTTPAdapter()) assert 'http://' in s2.adapters assert 'https://' in s2.adapters
def test_session_close_proxy_clear(self, mocker): proxies = { 'one': mocker.Mock(), 'two': mocker.Mock(), } session = requests.Session() mocker.patch.dict(session.adapters['http://'].proxy_manager, proxies) session.close() proxies['one'].clear.assert_called_once_with() proxies['two'].clear.assert_called_once_with()
def cancel(self): for v in self.adapters.values(): v.close() v.cancel()
def run(self): self._executor = ThreadPoolExecutor(self.concurrency) self.session = requests.Session() adapter = requests.adapters.HTTPAdapter( pool_connections=self.concurrency, pool_maxsize=self.concurrency) self.session.mount('http://', adapter) self.session.mount('https://', adapter) self.session.verify = self.verify_ssl t0 = time() last_report = time() i = 0 r = None for r in self.perform_requests(): if r is not True: i += 1 self.ui.info('{} responses sent | time elapsed {}s' .format(i, time() - t0)) if time() - last_report > REPORT_INTERVAL: self.progress_queue.put(( ProgressQueueMsg.NETWORK_PROGRESS, { "processed": self.n_requests, "retried": self.n_retried, "consumed": self.n_consumed, "rusage": get_rusage(), })) last_report = time() self.progress_queue.put((ProgressQueueMsg.NETWORK_DONE, { "ret": r, "processed": self.n_requests, "retried": self.n_retried, "consumed": self.n_consumed, "rusage": get_rusage(), }))
def __init__(self, access_key=None, secret_key=None, https_proxy=None, insecure=False, endpoint=None, search_endpoint=None): """Initializes a new API connection. Args: access_key (str, optitonal): The user's Matchlight Public API access key. If not passed as an argument this value must be set using the ``MATCHLIGHT_ACCESS_KEY`` environment variable. secret_key (str, optional): The user's Matchlight Public API access key. If not passed as an argument this value must be set using the ``MATCHLIGHT_SECRET_KEY`` environment variable. https_proxy (str): A string defining the HTTPS proxy to use. Defaults to None. insecure (bool, optional): Whether or not to verify certificates for the HTTPS proxy. Defaults to ``False`` (certificates will be verified). endpoint (str, optional): Base URL for requests. Defaults to ``'https://api.matchlig.ht/api/v2'``. search_endpoint (str, optional): Base URL for all search API requests. """ if access_key is None: access_key = os.environ.get('MATCHLIGHT_ACCESS_KEY', None) if secret_key is None: secret_key = os.environ.get('MATCHLIGHT_SECRET_KEY', None) if access_key is None or secret_key is None: raise matchlight.error.SDKError( 'The APIConnection object requires your Matchlight ' 'API access_key and secret_key either be passed as input ' 'parameters or set in the MATCHLIGHT_ACCESS_KEY and ' 'MATCHLIGHT_SECRET_KEY environment variables.') if endpoint is None: endpoint = MATCHLIGHT_API_URL_V2 if search_endpoint is None: search_endpoint = MATCHLIGHT_API_URL_V2 self.access_key = access_key self.secret_key = secret_key self.proxy = {'https': https_proxy} self.insecure = insecure self.endpoint = endpoint self.search_endpoint = search_endpoint self.session = requests.Session() self.session.mount( self.endpoint, requests.adapters.HTTPAdapter( max_retries=requests_urllib3.util.Retry( total=5, status_forcelist=[500, 502, 503, 504])), )