我们从Python开源项目中,提取了以下6个代码示例,用于说明如何使用requests_cache.CachedSession()。
def get_http(): """Get the http object.""" ensure_dir(CLUSTERFUZZ_TESTCASES_DIR) http = requests_cache.CachedSession( cache_name=os.path.join(CLUSTERFUZZ_TESTCASES_DIR, 'http_cache'), backend='sqlite', allowable_methods=('GET', 'POST'), allowable_codes=[200], expire_after=HTTP_CACHE_TTL) http.mount( 'https://', adapters.HTTPAdapter( # backoff_factor is 0.5. Therefore, the max wait time is 16s. retry.Retry( total=5, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504])) ) return http
def __init__(self): # Create the requests cache self.session = requests_cache.CachedSession( cache_name='cache', backend='sqlite', expire_after=seconds_to_cache)
def run(ctx, config_yaml, output_csv): """This command loads config.yaml and the current ENV-ironment, creates a single merged dict, and prints to stdout. """ # read the configuration file c = app.get_config_dict(ctx, [config_yaml]) # use cache to reduce web traffic session = requests_cache.CachedSession( cache_name='cache', backend='sqlite', expire_after=datetime.timedelta(days=days_to_cache)) # all data will also be combined into one CSV all_df = None for ticker in c['config']['options']['covered_calls']: option = Options(ticker, 'yahoo', session=session) # fetch all data df = option.get_all_data() # process the data df = covered_calls_process_dataframe(df) # ensure the all_df (contains all data from all tickers) if all_df is None: all_df = df.copy(deep=True) else: all_df = all_df.append(df) # output the all_df, which contains all of the tickers covered_calls_csv_out(output_csv, all_df) ##################################################################### # Functions
def run(ctx, config_yaml, output_csv): """This command loads config.yaml and the current ENV-ironment, creates a single merged dict, and prints to stdout. """ # read the configuration file c = app.get_config_dict(ctx, [config_yaml]) # use cache to reduce web traffic session = requests_cache.CachedSession( cache_name='cache', backend='sqlite', expire_after=datetime.timedelta(days=days_to_cache)) # all data will also be combined into one CSV all_df = None for ticker in c['config']['options']['long_puts']: option = Options(ticker, 'yahoo', session=session) # fetch all data df = option.get_all_data() # process the data df = long_puts_process_dataframe(df) # ensure the all_df (contains all data from all tickers) if all_df is None: all_df = df.copy(deep=True) else: all_df = all_df.append(df) # output the all_df, which contains all of the tickers long_puts_csv_out(output_csv, all_df) ##################################################################### # Functions
def download(self, url, headers, proxies): """ Download a and return the page content args: url (str): URL headers (dict): dict of headers (like user_agent) proxies (dict): proxy dict w/ keys 'http'/'https', values are strs (i.e. 'http(s)://IP') (default: None) """ session = requests_cache.CachedSession() session.hooks = {'response': self.make_throttle_hook(self.throttle)} try: resp = session.get(url, headers=headers, proxies=proxies, timeout=self.timeout) html = resp.text if resp.status_code >= 400: print('Download error:', resp.text) html = None if self.num_retries and 500 <= resp.status_code < 600: # recursively retry 5xx HTTP errors self.num_retries -= 1 return self.download(url, headers, proxies) except requests.exceptions.RequestException as e: print('Download error:', e) return {'html': None, 'code': 500} return {'html': html, 'code': resp.status_code}