Python requests_cache 模块,CachedSession() 实例源码

我们从Python开源项目中,提取了以下6个代码示例,用于说明如何使用requests_cache.CachedSession()

项目:clusterfuzz-tools    作者:google    | 项目源码 | 文件源码
def get_http():
  """Get the http object."""
  ensure_dir(CLUSTERFUZZ_TESTCASES_DIR)
  http = requests_cache.CachedSession(
      cache_name=os.path.join(CLUSTERFUZZ_TESTCASES_DIR, 'http_cache'),
      backend='sqlite',
      allowable_methods=('GET', 'POST'),
      allowable_codes=[200],
      expire_after=HTTP_CACHE_TTL)
  http.mount(
      'https://',
      adapters.HTTPAdapter(
          # backoff_factor is 0.5. Therefore, the max wait time is 16s.
          retry.Retry(
              total=5, backoff_factor=0.5,
              status_forcelist=[500, 502, 503, 504]))
  )
  return http
项目:options-screener    作者:dcwangmit01    | 项目源码 | 文件源码
def __init__(self):
        # Create the requests cache
        self.session = requests_cache.CachedSession(
            cache_name='cache',
            backend='sqlite',
            expire_after=seconds_to_cache)
项目:options-screener    作者:dcwangmit01    | 项目源码 | 文件源码
def run(ctx, config_yaml, output_csv):
    """This command loads config.yaml and the current ENV-ironment,
    creates a single merged dict, and prints to stdout.
    """

    # read the configuration file
    c = app.get_config_dict(ctx, [config_yaml])

    # use cache to reduce web traffic
    session = requests_cache.CachedSession(
        cache_name='cache',
        backend='sqlite',
        expire_after=datetime.timedelta(days=days_to_cache))

    # all data will also be combined into one CSV
    all_df = None

    for ticker in c['config']['options']['covered_calls']:
        option = Options(ticker, 'yahoo', session=session)

        # fetch all data
        df = option.get_all_data()

        # process the data
        df = covered_calls_process_dataframe(df)

        # ensure the all_df (contains all data from all tickers)
        if all_df is None:
            all_df = df.copy(deep=True)
        else:
            all_df = all_df.append(df)

        # output the all_df, which contains all of the tickers
        covered_calls_csv_out(output_csv, all_df)


#####################################################################
# Functions
项目:options-screener    作者:dcwangmit01    | 项目源码 | 文件源码
def run(ctx, config_yaml, output_csv):
    """This command loads config.yaml and the current ENV-ironment,
    creates a single merged dict, and prints to stdout.
    """

    # read the configuration file
    c = app.get_config_dict(ctx, [config_yaml])

    # use cache to reduce web traffic
    session = requests_cache.CachedSession(
        cache_name='cache',
        backend='sqlite',
        expire_after=datetime.timedelta(days=days_to_cache))

    # all data will also be combined into one CSV
    all_df = None

    for ticker in c['config']['options']['long_puts']:
        option = Options(ticker, 'yahoo', session=session)

        # fetch all data
        df = option.get_all_data()

        # process the data
        df = long_puts_process_dataframe(df)

        # ensure the all_df (contains all data from all tickers)
        if all_df is None:
            all_df = df.copy(deep=True)
        else:
            all_df = all_df.append(df)

        # output the all_df, which contains all of the tickers
        long_puts_csv_out(output_csv, all_df)


#####################################################################
# Functions
项目:wswp    作者:kjam    | 项目源码 | 文件源码
def download(self, url, headers, proxies):
        """ Download a and return the page content
            args:
                url (str): URL
                headers (dict): dict of headers (like user_agent)
                proxies (dict): proxy dict w/ keys 'http'/'https', values
                    are strs (i.e. 'http(s)://IP') (default: None)
        """
        session = requests_cache.CachedSession()
        session.hooks = {'response': self.make_throttle_hook(self.throttle)}

        try:
            resp = session.get(url, headers=headers, proxies=proxies,
                               timeout=self.timeout)
            html = resp.text
            if resp.status_code >= 400:
                print('Download error:', resp.text)
                html = None
                if self.num_retries and 500 <= resp.status_code < 600:
                    # recursively retry 5xx HTTP errors
                    self.num_retries -= 1
                    return self.download(url, headers, proxies)
        except requests.exceptions.RequestException as e:
            print('Download error:', e)
            return {'html': None, 'code': 500}
        return {'html': html, 'code': resp.status_code}
项目:Python-Web-Scraping-Second-Edition    作者:PacktPublishing    | 项目源码 | 文件源码
def download(self, url, headers, proxies):
        """ Download a and return the page content
            args:
                url (str): URL
                headers (dict): dict of headers (like user_agent)
                proxies (dict): proxy dict w/ keys 'http'/'https', values
                    are strs (i.e. 'http(s)://IP') (default: None)
        """
        session = requests_cache.CachedSession()
        session.hooks = {'response': self.make_throttle_hook(self.throttle)}

        try:
            resp = session.get(url, headers=headers, proxies=proxies,
                               timeout=self.timeout)
            html = resp.text
            if resp.status_code >= 400:
                print('Download error:', resp.text)
                html = None
                if self.num_retries and 500 <= resp.status_code < 600:
                    # recursively retry 5xx HTTP errors
                    self.num_retries -= 1
                    return self.download(url, headers, proxies)
        except requests.exceptions.RequestException as e:
            print('Download error:', e)
            return {'html': None, 'code': 500}
        return {'html': html, 'code': resp.status_code}