我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用requests.adapters.HTTPAdapter()。
def __init__(self, timeout=60, cache=False, max_retries=None, retry_interval=None): """The constructor. Args: timeout (float): The default global timeout(seconds). """ self.timeout = timeout self.session = requests.session() if max_retries and retry_interval: retries = Retry(total=max_retries, backoff_factor=retry_interval) self.session.mount('http://', HTTPAdapter(max_retries=retries)) self.session.mount('https://', HTTPAdapter(max_retries=retries)) if cache: self.session = CacheControl(self.session)
def retry_session(): # This will give the total wait time in minutes: # >>> sum([min((0.3 * (2 ** (i - 1))), 120) / 60 for i in range(24)]) # >>> 30.5575 # This works by the using the minimum time in seconds of the backoff time # and the max back off time which defaults to 120 seconds. The backoff time # increases after every failed attempt. session = requests.Session() retry = Retry( total=24, read=5, connect=24, backoff_factor=0.3, status_forcelist=(500, 502, 504), method_whitelist=('GET', 'POST'), ) adapter = HTTPAdapter(max_retries=retry) session.mount('http://', adapter) session.mount('https://', adapter) return session
def requests_retry_session( retries=3, backoff_factor=0.3, status_forcelist=(500, 502, 504), session=None, ): session = session or requests.Session() retry = Retry( total=retries, read=retries, connect=retries, backoff_factor=backoff_factor, status_forcelist=status_forcelist, ) adapter = HTTPAdapter(max_retries=retry) session.mount('http://', adapter) session.mount('https://', adapter) return session
def monkeypatch(validate_certificate=True): """Sets up all Sessions to use AppEngineAdapter by default. If you don't want to deal with configuring your own Sessions, or if you use libraries that use requests directly (ie requests.post), then you may prefer to monkeypatch and auto-configure all Sessions. .. warning: : If ``validate_certificate`` is ``False``, certification validation will effectively be disabled for all requests. """ _check_version() # HACK: We should consider modifying urllib3 to support this cleanly, # so that we can set a module-level variable in the sessions module, # instead of overriding an imported HTTPAdapter as is done here. adapter = AppEngineAdapter if not validate_certificate: adapter = InsecureAppEngineAdapter sessions.HTTPAdapter = adapter adapters.HTTPAdapter = adapter
def get_http(): """Get the http object.""" ensure_dir(CLUSTERFUZZ_TESTCASES_DIR) http = requests_cache.CachedSession( cache_name=os.path.join(CLUSTERFUZZ_TESTCASES_DIR, 'http_cache'), backend='sqlite', allowable_methods=('GET', 'POST'), allowable_codes=[200], expire_after=HTTP_CACHE_TTL) http.mount( 'https://', adapters.HTTPAdapter( # backoff_factor is 0.5. Therefore, the max wait time is 16s. retry.Retry( total=5, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504])) ) return http
def __init__(self, executor=None, max_workers=2, session=None, *args, **kwargs): """Creates a FuturesSession Notes ~~~~~ * ProcessPoolExecutor is not supported b/c Response objects are not picklable. * If you provide both `executor` and `max_workers`, the latter is ignored and provided executor is used as is. """ super(FuturesSession, self).__init__(*args, **kwargs) if executor is None: executor = ThreadPoolExecutor(max_workers=max_workers) # set connection pool size equal to max_workers if needed if max_workers > DEFAULT_POOLSIZE: adapter_kwargs = dict(pool_connections=max_workers, pool_maxsize=max_workers) self.mount('https://', HTTPAdapter(**adapter_kwargs)) self.mount('http://', HTTPAdapter(**adapter_kwargs)) self.executor = executor self.session = session
def __init__(self, document, password, omit_sensitive_data=False, quiet=False): if not quiet: print('[*] Nubank Parser is starting...') self.account = Account(document, None, password, account_type='card') self.omit_sensitive_data = omit_sensitive_data self.quiet = quiet self.account.currency = 'R$' self.account.bank = 'Nubank' self.session = requests.Session() self.session.mount(self.api_endpoint, HTTPAdapter(max_retries=32,pool_connections=50, pool_maxsize=50)) self.session.headers.update({'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.87 Safari/537.36'}) self.session.headers.update({'Content-Type': 'application/json'}) self.session.headers.update({'Referer': 'https://conta.nubank.com.br/'})
def __init__(self, card, omit_sensitive_data=False, quiet=False, dbc_username=None, dbc_password=None, validator=TicketValidator): if not quiet: print('[*] Ticket Parser is starting...') self.validator = validator() self.account = Account(card=card, account_type='card') self.validate() self.account.currency = 'R$' self.omit_sensitive_data = omit_sensitive_data self.quiet = quiet self.account.currency = 'R$' self.account.bank = 'Ticket' self.captcha = '' self.token = '' self.session = requests.Session() self.session.mount(self.api_endpoint, HTTPAdapter(max_retries=32, pool_connections=50, pool_maxsize=50)) self.session.headers.update({'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.87 Safari/537.36'}) self.session.headers.update({'Content-Type': 'application/x-www-form-urlencoded'}) self.session.headers.update({'Referer': 'http://www.ticket.com.br/portal/consulta-de-saldo/'})
def __init__(self, card, document, omit_sensitive_data=False, quiet=False, validator=SodexoValidator): if not quiet: print('[*] Sodexo Parser is starting...') self.validator = validator() self.account = Account(document=document, card=card, account_type='card') self.validate() self.omit_sensitive_data = omit_sensitive_data self.quiet = quiet self.account.currency = 'R$' self.account.bank = 'Sodexo' self.session = requests.Session() self.session.mount(self.api_endpoint, HTTPAdapter(max_retries=32, pool_connections=50, pool_maxsize=50)) self.session.headers.update({'User-Agent': 'Apache-HttpClient/android/Nexus 5'}) self.session.headers.update({'Content-Type': 'application/x-www-form-urlencoded'})
def ubus_cd(session_id, account_id, action, out_params, url_param=None): url = "http://kjapi.peiluyou.com:5171/ubus_cd?account_id=%s&session_id=%s&action=%s" % ( account_id, session_id, action) if url_param is not None: url += url_param params = ["%s" % session_id] + out_params data = {"jsonrpc": "2.0", "id": 1, "method": "call", "params": params} try: body = dict(data=json.dumps(data), action='onResponse%d' % int(time.time() * 1000)) s = requests.Session() s.mount('http://', HTTPAdapter(max_retries=5)) proxies = api_proxies() r = s.post(url, data=body, proxies=proxies) result = r.text[r.text.index('{'):r.text.rindex('}') + 1] return json.loads(result) except requests.exceptions.RequestException as e: return __handle_exception(e=e) # ??????
def _requests_retry_session( self, retries=3, backoff_factor=0.3, method_whitelist=['HEAD', 'GET', 'POST', 'PUT', 'DELETE', 'OPTIONS', 'TRACE'], status_forcelist=(500, 502, 503, 504, 520, 524), session=None, ): session = session or requests.Session() retry = Retry( total=retries, read=retries, connect=retries, status=retries, method_whitelist=method_whitelist, backoff_factor=backoff_factor, status_forcelist=status_forcelist, ) adapter = HTTPAdapter(max_retries=retry) session.mount('http://', adapter) session.mount('https://', adapter) return session
def ubus_cd(session_id, account_id, action, out_params, url_param=None): url = "http://kjapi.peiluyou.com:5171/ubus_cd?account_id=%s&session_id=%s&action=%s" % (account_id, session_id, action) if url_param is not None: url += url_param params = ["%s" % session_id] + out_params data = {"jsonrpc": "2.0", "id": 1, "method": "call", "params": params} try: body = dict(data=json.dumps(data), action='onResponse%d' % int(time.time() * 1000)) s = requests.Session() s.mount('http://', HTTPAdapter(max_retries=5)) proxies = api_proxies() r = s.post(url, data=body, proxies=proxies) result = r.text[r.text.index('{'):r.text.rindex('}')+1] return json.loads(result) except requests.exceptions.RequestException as e: return __handle_exception(e=e) # ??????
def get_soup(url, num_retries = 10): """ Takes in a url and returns the parsed BeautifulSoup code for that url with handling capabilities if the request 'bounces'. """ s = requests.Session() retries = Retry( total = num_retries, backoff_factor = 0.1, status_forcelist = [500, 502, 503, 504] ) s.mount('http://', HTTPAdapter(max_retries = retries)) return BeautifulSoup(s.get(url).text, 'html.parser')
def __init__(self, loginer=None, use_proxy=False): self.loginer = loginer self.use_proxy = use_proxy if use_proxy: self.proxy_pool = ProxyPool() if len(self.proxy_pool) == 0: self.use_proxy = False self._cookies = None self._headers = dict() self._headers[ "User-Agent"] = "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36" self._headers["Accept"] = "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8" self._headers["Accept-Encoding"] = "gzip, deflate, sdch" self._headers["Accept-Language"] = "zh-CN,zh;q=0.8" self._request_retry = HTTPAdapter(max_retries=3) cookie_dict = dict() self._cookies = cookie_dict
def init_poolmanager(self, connections, maxsize, block=DEFAULT_POOLBLOCK): """Initializes a urllib3 PoolManager. This method should not be called from user code, and is only exposed for use when subclassing the :class:`HTTPAdapter <requests.adapters.HTTPAdapter>`. :param connections: The number of urllib3 connection pools to cache. :param maxsize: The maximum number of connections to save in the pool. :param block: Block when no free connections are available. """ # save these values for pickling self._pool_connections = connections self._pool_maxsize = maxsize self._pool_block = block self.poolmanager = AsyncPoolManager(num_pools=connections, maxsize=maxsize, block=block) self.connections = []
def get_connection(self, url, proxies=None): """Returns a urllib3 connection for the given URL. This should not be called from user code, and is only exposed for use when subclassing the :class:`HTTPAdapter <requests.adapters.HTTPAdapter>`. :param url: The URL to connect to. :param proxies: (optional) A Requests-style dictionary of proxies used on this request. """ proxies = proxies or {} proxy = proxies.get(urlparse(url.lower()).scheme) if proxy: proxy_headers = self.proxy_headers(proxy) if proxy not in self.proxy_manager: self.proxy_manager[proxy] = proxy_from_url( proxy, proxy_headers=proxy_headers, num_pools=self._pool_connections, maxsize=self._pool_maxsize, block=self._pool_block ) conn = self.proxy_manager[proxy].connection_from_url(url) else: # Only scheme should be lower case parsed = urlparse(url) url = parsed.geturl() conn = self.poolmanager.connection_from_url(url) self.connections.append(conn) return conn
def demo(base_url): """Login through a third-party OAuth handler and print some stats. Parameters ---------- base_url : str Base URL of the CMS server. """ session = requests.Session() adapter = HTTPAdapter(max_retries=Retry(total=3, backoff_factor=0.02)) session.mount('{}://'.format(urlparse(base_url).scheme), adapter) wb = webbrowser.get() login_url = os.path.join(base_url, "login?complete=no") session.get(login_url) wb.open(login_url) auth_url = input("Enter the URL returned after authentication:") response = session.get(auth_url.replace("complete=no", 'complete=yes')) assert response.status_code == 200 print(session.get(os.path.join(base_url, 'me')).content)
def get_requests_session(self): requests_session = self._session.GetParameter("requests_session") if requests_session == None: # To make sure we can use the requests session in the threadpool we # need to make sure that the connection pool can block. Otherwise it # will raise when it runs out of connections and the threads will be # terminated. requests_session = requests.Session() requests_session.mount("https://", adapters.HTTPAdapter( pool_connections=10, pool_maxsize=300, max_retries=10, pool_block=True)) requests_session.mount("http://", adapters.HTTPAdapter( pool_connections=10, pool_maxsize=300, max_retries=10, pool_block=True)) self._session.SetCache("requests_session", requests_session) return requests_session
def list_folders(csrftoken, sessionid): print("Getting list of folders") # Create a session object from requests library s = requests.Session() retries = Retry(total=10, backoff_factor=1, status_forcelist=[500, 502, 504]) s.mount('https://', HTTPAdapter(max_retries=retries)) s.headers.update({'Cookie': 'csrftoken={0};' 'sessionid={1}'.format(csrftoken, sessionid)}) mdatareq = 'https://app.openmailbox.org/requests/webmail?action=folderlist' print(mdatareq) metadata = json.loads(s.get(mdatareq).text) print(metadata) print('\nFolder names:') for line in metadata['folders']: print(line['name'])
def _wait_for_server_start(self): """ Wait for the mock service to be ready for requests. :rtype: None :raises RuntimeError: If there is a problem starting the mock service. """ s = requests.Session() retries = Retry(total=15, backoff_factor=0.1) s.mount('http://', HTTPAdapter(max_retries=retries)) resp = s.get(self.uri, headers=self.HEADERS) if resp.status_code != 200: self._process.terminate() self._process.communicate() raise RuntimeError( 'There was a problem starting the mock service: %s', resp.text)
def init_poolmanager(self, connections, maxsize, block=DEFAULT_POOLBLOCK, **pool_kwargs): """ Initializes a urllib3 PoolManager. This method should not be called from user code, and is only exposed for use when subclassing the :class:`HTTPAdapter <requests.adapters.HTTPAdapter>`. :param connections: The number of urllib3 connection pools to cache. :param maxsize: The maximum number of connections to save in the pool. :param block: Block when no free connections are available. :param pool_kwargs: Extra keyword arguments used to initialize the Pool Manager. """ self._pool_connections = connections self._pool_maxsize = maxsize self._pool_block = block self.poolmanager = PoolManager( num_pools=connections, maxsize=maxsize, block=block, strict=True, ssl_version=self.SSL_VERSION, **pool_kwargs )
def requests_retry_session( retries=3, backoff_factor=0.3, status_forcelist=(502, 504), session=None, ): session = session or requests.Session() retry = Retry( total=retries, read=retries, connect=retries, backoff_factor=backoff_factor, status_forcelist=status_forcelist, ) adapter = HTTPAdapter(max_retries=retry) session.mount('http://', adapter) session.mount('https://', adapter) return session
def get_url(url): """ Get the url :param url: given url :return: page """ response = requests.Session() retries = Retry(total=10, backoff_factor=.1) response.mount('http://', HTTPAdapter(max_retries=retries)) try: response = response.get(url, timeout=5) response.raise_for_status() except (requests.exceptions.HTTPError, requests.exceptions.ConnectionError): return None return response
def dir_bruter(self, target_url, word_queue, user_agent): results = {} session = requests.Session() session.mount(target_url.split(':', 1)[0], HTTPAdapter(max_retries=3)) while not word_queue.empty(): # attempt = word_queue.get() attempt_list = [word_queue.get()] for brute in attempt_list: headers = {"User-Agent": user_agent} request = session.get(target_url + brute, headers=headers, verify=False) if request.status_code == 200: print("{i} [{r}] => {u}".format(i=ctinfo, r=request.status_code, u=request.url)) logging.info("{i} [{r}] => {u}".format(i=ctinfo, r=request.status_code, u=request.url)) results[request.url] = request.status_code elif request.status_code != 404: # TODO: add a setting `only_save_200` or something like that, if no, save these results. logging.error("{e} {c} => {u}".format(e=cterr, c=request.status_code, u=request.url)) pass return results
def _endpoint_premium_get(self, url): s = requests.Session() retries = Retry(total=5, backoff_factor=0.1, status_forcelist=[429]) s.mount('http://', HTTPAdapter(max_retries=retries)) try: r = s.get(url, auth=(self.username, self.api_key)) except requests.exceptions.ConnectionError as e: raise ConnectionError(repr(e)) s.close() if r.status_code in [404, 422]: return None if r.status_code == 400: raise BadRequest('Bad Request for url {}'.format(url)) results = r.json() if results: return results else: return None
def _endpoint_premium_delete(self, url): s = requests.Session() retries = Retry(total=5, backoff_factor=0.1, status_forcelist=[429]) s.mount('http://', HTTPAdapter(max_retries=retries)) try: r = s.delete(url, auth=(self.username, self.api_key)) except requests.exceptions.ConnectionError as e: raise ConnectionError(repr(e)) s.close() if r.status_code == 400: raise BadRequest('Bad Request for url {}'.format(url)) if r.status_code == 200: return True if r.status_code == 404: return None
def _endpoint_premium_put(self, url, payload=None): s = requests.Session() retries = Retry(total=5, backoff_factor=0.1, status_forcelist=[429]) s.mount('http://', HTTPAdapter(max_retries=retries)) try: r = s.put(url, data=payload, auth=(self.username, self.api_key)) except requests.exceptions.ConnectionError as e: raise ConnectionError(repr(e)) s.close() if r.status_code == 400: raise BadRequest('Bad Request for url {}'.format(url)) if r.status_code == 200: return True if r.status_code in [404, 422]: return None # Get Show object
def setup_http_session(self): if self.http_session: self.http_session.close() self.http_session = Session() self.http_session.headers['User-Agent'] = USER_AGENT http_retry = Retry(total=5, status_forcelist=[500, 503], backoff_factor=.5) http_adapter = HTTPAdapter(max_retries=http_retry) self.http_session.mount('http://', http_adapter) http_retry = Retry(total=5, status_forcelist=[500, 503], backoff_factor=.5) http_adapter = HTTPAdapter(max_retries=http_retry) self.http_session.mount('https://', http_adapter)
def http_session(self): """Returns a :class:`requests.Session` object. A new session is created if it doesn't already exist.""" http_session = getattr(self, '_http_session', None) if not http_session: requests.packages.urllib3.disable_warnings() session = requests.Session() session.headers['User-Agent'] = USER_AGENT http_retry = Retry(total=5, status_forcelist=[500, 503], backoff_factor=.5) http_adapter = HTTPAdapter(max_retries=http_retry) session.mount('http://', http_adapter) http_retry = Retry(total=5, status_forcelist=[500, 503], backoff_factor=.5) http_adapter = HTTPAdapter(max_retries=http_retry) session.mount('https://', http_adapter) self._http_session = session return self._http_session
def __init__(self, endpoint=None, application_key=None, application_secret=None, consumer_key=None, timeout=TIMEOUT): from requests import Session from requests.adapters import HTTPAdapter self._endpoint = ENDPOINTS[endpoint] self._application_key = application_key self._application_secret = application_secret self._consumer_key = consumer_key # lazy load time delta self._time_delta = None try: # Some older versions of requests to not have the urllib3 # vendorized package from requests.packages.urllib3.util.retry import Retry except ImportError: retries = 5 else: # use a requests session to reuse connections between requests retries = Retry( total=5, backoff_factor=0.2, status_forcelist=[422, 500, 502, 503, 504] ) self._session = Session() self._session.mount('https://', HTTPAdapter(max_retries=retries)) self._session.mount('http://', HTTPAdapter(max_retries=retries)) # Override default timeout self._timeout = timeout
def requests_retry_session( retries=3, backoff_factor=0.3, status_forcelist=(500, 502, 504), ): """Opinionated wrapper that creates a requests session with a HTTPAdapter that sets up a Retry policy that includes connection retries. If you do the more naive retry by simply setting a number. E.g.:: adapter = HTTPAdapter(max_retries=3) then it will raise immediately on any connection errors. Retrying on connection errors guards better on unpredictable networks. From http://docs.python-requests.org/en/master/api/?highlight=retries#requests.adapters.HTTPAdapter it says: "By default, Requests does not retry failed connections." The backoff_factor is documented here: https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#urllib3.util.retry.Retry A default of retries=3 and backoff_factor=0.3 means it will sleep like:: [0.3, 0.6, 1.2] """ # noqa session = requests.Session() retry = Retry( total=retries, read=retries, connect=retries, backoff_factor=backoff_factor, status_forcelist=status_forcelist, ) adapter = HTTPAdapter(max_retries=retry) session.mount('http://', adapter) session.mount('https://', adapter) return session
def _on_request(self, request, **kwargs): match = self._find_match(request) # TODO(dcramer): find the correct class for this if match is None: error_msg = 'Connection refused: {0}'.format(request.url) response = ConnectionError(error_msg) self._calls.add(request, response) raise response headers = { 'Content-Type': match['content_type'], } if match['adding_headers']: headers.update(match['adding_headers']) response = HTTPResponse( status=match['status'], body=BufferIO(match['body']), headers=headers, preload_content=False, ) adapter = HTTPAdapter() response = adapter.build_response(request, response) if not match['stream']: response.content # NOQA self._calls.add(request, response) return response
def __init__(self, base_url, queries=None, **kwargs): """Constructor Args: base_url (str): the server's url queries (Optional[Query]): the queries """ self.session = FuturesSession(max_workers=self.MAX_WORKERS) retries = Retry(total=Connection.MAX_RETRIES, backoff_factor=1, status_forcelist=Connection.STATUS_FORCELIST) self.session.mount(base_url, HTTPAdapter(max_retries=retries)) self.results = [] self.queries = queries if kwargs: if 'timeout' in kwargs: self.TIMEOUT = kwargs['timeout'] if 'max_retries' in kwargs: self.MAX_RETRIES = kwargs['max_retries'] if 'max_workers' in kwargs: self.MAX_WORKERS = kwargs['max_workers'] if 'user_agent' in kwargs: self.USER_AGENT = kwargs['user_agent'] if 'x_forwarded_for' in kwargs: self.X_FORWARDED_FOR = utils.get_x_fwded_for_str(kwargs['x_forwarded_for']) self.exec_queries()
def __init__(self, base_url=BASE_URL, login_url=LOGIN_URL, session=None): self.base_url = base_url self.login_url = login_url self.url = base_url.rstrip('/') + '/mods' self.session = session or requests.session() adapter = HTTPAdapter(max_retries=Retry(status_forcelist=[500, 503])) self.session.mount('https://', adapter) self.session.mount('http://', adapter)
def new_connection(self, cluster_api, provider): config = cluster_api.nsxlib_config session = TimeoutSession(config.http_timeout, config.http_read_timeout) if config.client_cert_provider: session.cert_provider = config.client_cert_provider else: session.auth = (provider.username, provider.password) # NSX v3 doesn't use redirects session.max_redirects = 0 session.verify = not config.insecure if session.verify and provider.ca_file: # verify using the said ca bundle path session.verify = provider.ca_file # we are pooling with eventlet in the cluster class adapter = adapters.HTTPAdapter( pool_connections=1, pool_maxsize=1, max_retries=config.retries, pool_block=False) session.mount('http://', adapter) session.mount('https://', adapter) self.get_default_headers(session, provider, config.allow_overwrite_header) return session
def rsess(): s = requests.Session() # Just so one random 500 doesn't break an uptime; two consecutive errors are # worrying though. s.mount('http://', HTTPAdapter(max_retries=MAX_RETRIES)) s.mount('https://', HTTPAdapter(max_retries=MAX_RETRIES)) s.headers.update({ 'User-Agent': USER_AGENT, }) return s
def get_async_requests_session(num_retries, backoff_factor, pool_size, status_forcelist=[500, 502, 503, 504]): # Use requests & urllib3 to auto-retry. # If the backoff_factor is 0.1, then sleep() will sleep for [0.1s, 0.2s, # 0.4s, ...] between retries. It will also force a retry if the status # code returned is in status_forcelist. session = FuturesSession(max_workers=pool_size) # If any regular response is generated, no retry is done. Without using # the status_forcelist, even a response with status 500 will not be # retried. retries = Retry(total=num_retries, backoff_factor=backoff_factor, status_forcelist=status_forcelist) # Mount handler on both HTTP & HTTPS. session.mount('http://', HTTPAdapter(max_retries=retries, pool_connections=pool_size, pool_maxsize=pool_size)) session.mount('https://', HTTPAdapter(max_retries=retries, pool_connections=pool_size, pool_maxsize=pool_size)) return session # Evaluates the status of PTC and Niantic request futures, and returns the # result (optionally with an error). # Warning: blocking! Can only get status code if request has finished.
def download_webpage(target_url, proxy=None, timeout=5): s = requests.Session() retries = Retry(total=3, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504]) s.mount('http://', HTTPAdapter(max_retries=retries)) headers = { 'User-Agent': ('Mozilla/5.0 (Windows NT 6.1; WOW64; rv:54.0) ' + 'Gecko/20100101 Firefox/54.0'), 'Referer': 'http://google.com' } r = s.get(target_url, proxies={'http': proxy, 'https': proxy}, timeout=timeout, headers=headers) if r.status_code == 200: return r.content return None # Sockslist.net uses javascript to obfuscate proxies port number. # Builds a dictionary with decoded values for each variable. # Dictionary = {'var': intValue, ...})
def test_transport_adapter_ordering(self): s = requests.Session() order = ['https://', 'http://'] assert order == list(s.adapters) s.mount('http://git', HTTPAdapter()) s.mount('http://github', HTTPAdapter()) s.mount('http://github.com', HTTPAdapter()) s.mount('http://github.com/about/', HTTPAdapter()) order = [ 'http://github.com/about/', 'http://github.com', 'http://github', 'http://git', 'https://', 'http://', ] assert order == list(s.adapters) s.mount('http://gittip', HTTPAdapter()) s.mount('http://gittip.com', HTTPAdapter()) s.mount('http://gittip.com/about/', HTTPAdapter()) order = [ 'http://github.com/about/', 'http://gittip.com/about/', 'http://github.com', 'http://gittip.com', 'http://github', 'http://gittip', 'http://git', 'https://', 'http://', ] assert order == list(s.adapters) s2 = requests.Session() s2.adapters = {'http://': HTTPAdapter()} s2.mount('https://', HTTPAdapter()) assert 'http://' in s2.adapters assert 'https://' in s2.adapters
def test_urllib3_retries(httpbin): from requests.packages.urllib3.util import Retry s = requests.Session() s.mount('http://', HTTPAdapter(max_retries=Retry( total=2, status_forcelist=[500] ))) with pytest.raises(RetryError): s.get(httpbin('status/500'))
def test_urllib3_pool_connection_closed(httpbin): s = requests.Session() s.mount('http://', HTTPAdapter(pool_connections=0, pool_maxsize=0)) try: s.get(httpbin('status/200')) except ConnectionError as e: assert u"Pool is closed." in str(e)
def start(self): assert not self.started, "File download has already been started, and cannot be started again" # initialize the requests session, with backoff-retries enabled self.session = requests.Session() self.session.mount('http://', HTTPAdapter(max_retries=retries)) self.session.mount('https://', HTTPAdapter(max_retries=retries)) # initiate the download, check for status errors, and calculate download size self.response = self.session.get( self.source, stream=True, timeout=self.timeout) self.response.raise_for_status() self.total_size = int(self.response.headers['content-length']) self.started = True
def __init__(self, config): self.config = config self.client = Session() retries = Retry( total=self.config.connection_retries, backoff_factor=1, status_forcelist=[500, 502, 503, 504] ) self.client.mount( self.config.protocol + "://", HTTPAdapter(max_retries=retries) ) if hasattr(self.config, "timeout") and self.config.timeout: self.client.send = partial( self.client.send, timeout=self.config.timeout )
def __init__(self, server_http_url = '', *args, **kwargs): self.server_http_url = server_http_url self.session = requests.session() self.session.headers = {"Accept": "application/json", "Content-type": "application/json"} self.session.verify = False # Increase the number of pool connections so we can test large numbers # of connections to the api. adapter = adapters.HTTPAdapter(pool_connections=2000, pool_maxsize=2000) self.session.mount('http://', adapter)
def __init__(self, timeout=None, retries=None): super(Session, self).__init__() self.timeout = timeout if retries is None: retry = Retry(**DEFAULT_RETRY_ARGS) elif isinstance(retries, int): args = DEFAULT_RETRY_ARGS.copy() args.pop('total', None) retry = Retry(total=retries, **args) elif isinstance(retries, dict): retry = Retry(**retries) self.mount('http://', HTTPAdapter(max_retries=retry)) self.mount('https://', HTTPAdapter(max_retries=retry))