Python requests_cache 模块,install_cache() 实例源码

我们从Python开源项目中,提取了以下22个代码示例,用于说明如何使用requests_cache.install_cache()

项目:cbapi-python    作者:carbonblack    | 项目源码 | 文件源码
def main(argv):
    parser = build_cli_parser()
    opts, args = parser.parse_args(argv)
    if not opts.cache_name:
        parser.print_help()
        sys.exit(-1)

    global cache_file_name
    cache_file_name = opts.cache_name
    requests_cache.install_cache(cache_file_name, allowable_methods=('GET', 'POST'))

    global cb
    cb = CbEnterpriseResponseAPI()

    large_process_search()
    large_binary_search()
    sensor_search()
    watchlist_search()
    feed_search()
项目:cti-stix-validator    作者:oasis-open    | 项目源码 | 文件源码
def init_requests_cache(refresh_cache=False):
    """
    Initializes a cache which the ``requests`` library will consult for
    responses, before making network requests.

    :param refresh_cache: Whether the cache should be cleared out
    """
    # Cache data from external sources; used in some checks
    dirs = AppDirs("stix2-validator", "OASIS")
    # Create cache dir if doesn't exist
    try:
        os.makedirs(dirs.user_cache_dir)
    except OSError as e:
        if e.errno != errno.EEXIST:
            raise
    requests_cache.install_cache(
        cache_name=os.path.join(dirs.user_cache_dir, 'py{}cache'.format(
            sys.version_info[0])),
        expire_after=datetime.timedelta(weeks=1))

    if refresh_cache:
        clear_requests_cache()
项目:loggingnight    作者:kdknigga    | 项目源码 | 文件源码
def enable_cache(expire_after=691200):
        if not 'requests_cache' in modules:
            return False

        requests_cache.install_cache('loggingnight_cache', backend='sqlite', expire_after=expire_after)
        return True
项目:manubot    作者:greenelab    | 项目源码 | 文件源码
def generate_csl_items(args, citation_df):
    """
    General CSL (citeproc) items for standard_citations in citation_df.
    Writes references.json to disk and logs warnings for potential problems.
    """
    # Read manual references (overrides) in JSON CSL
    manual_refs = read_manual_references(args.manual_references_path)

    requests_cache.install_cache(args.requests_cache_path, include_get_headers=True)
    cache = requests_cache.get_cache()
    if args.clear_requests_cache:
        logging.info('Clearing requests-cache')
        requests_cache.clear()
    logging.info(f'requests-cache starting with {len(cache.responses)} cached responses')

    csl_items = list()
    failures = list()
    for citation in citation_df.standard_citation.unique():
        if citation in manual_refs:
            csl_items.append(manual_refs[citation])
            continue
        try:
            citeproc = citation_to_citeproc(citation)
            csl_items.append(citeproc)
        except Exception as error:
            logging.exception(f'Citeproc retrieval failure for {citation}')
            failures.append(citation)

    logging.info(f'requests-cache finished with {len(cache.responses)} cached responses')
    requests_cache.uninstall_cache()

    if failures:
        message = 'Citeproc retrieval failed for:\n{}'.format(
            '\n'.join(failures))
        logging.error(message)

    # Write JSON CSL bibliography for Pandoc.
    with args.references_path.open('w') as write_file:
        json.dump(csl_items, write_file, indent=2, ensure_ascii=False)
        write_file.write('\n')
    return csl_items
项目:cryptop    作者:huwwp    | 项目源码 | 文件源码
def main():
    if os.path.isfile(BASEDIR):
        sys.exit('Please remove your old configuration file at {}'.format(BASEDIR))
    os.makedirs(BASEDIR, exist_ok=True)

    global CONFIG
    CONFIG = read_configuration(CONFFILE)
    locale.setlocale(locale.LC_MONETARY, CONFIG['locale'].get('monetary', ''))

    requests_cache.install_cache(cache_name='api_cache', backend='memory',
        expire_after=int(CONFIG['api'].get('cache', 10)))

    curses.wrapper(mainc)
项目:synonym    作者:gavinzbq    | 项目源码 | 文件源码
def _enable_cache():
    if not os.path.exists(CACHE_DIR):
        os.makedirs(CACHE_DIR)
    requests_cache.install_cache(CACHE_FILE)
项目:statsnba-playbyplay    作者:ethanluoyc    | 项目源码 | 文件源码
def __init__(self, cache=False,
                 cache_filename="requests.cache"):
        self._cache = cache
        if cache:
            requests_cache.install_cache(cache_filename)
        self._transform_json = True
项目:statsnba-playbyplay    作者:ethanluoyc    | 项目源码 | 文件源码
def use_requests_cache():
    import requests_cache
    requests_cache.install_cache('test_cache')
项目:statsnba-playbyplay    作者:ethanluoyc    | 项目源码 | 文件源码
def pytest_runtest_setup(item):
        # called for running each test in 'a' directory
    import requests_cache
    requests_cache.install_cache('test_cache')
项目:statsnba-playbyplay    作者:ethanluoyc    | 项目源码 | 文件源码
def pytest_configure(config):
    if config.getoption('--use-cache'):
        import requests_cache
        requests_cache.install_cache('test_cache')
    api = Api()
    pytest.game_ids = api.GetSeasonGameIDs('2009-10', 'Regular Season')[:2]  # Hack to carry the gameids to tests
    pytest.game_ids = ['0020900292']
项目:tbcnn    作者:crestonbunch    | 项目源码 | 文件源码
def fetch(outfile):
    """The main function for downloading all scripts from github."""
    if not os.path.exists(REQUESTS_CACHE):
        os.makedirs(REQUESTS_CACHE)

    requests_cache.install_cache(REQUESTS_CACHE)

    result = []

    label_counts = defaultdict(int)

    print('Fetching scripts')
    for label, url in DATA_URLS.items():
        print(url)
        scripts = fetch_scripts(url)
        for script in scripts:
            try:
                result.append({
                    'tree': build_tree(script), 'metadata': {'label': label}
                })
                label_counts[label] += 1
            except Exception as err:
                print(err)

    print('Label counts: ', label_counts)

    print('Dumping scripts')
    with open(outfile, 'wb') as file_handler:
        pickle.dump(result, file_handler)
项目:query_tcga    作者:jburos    | 项目源码 | 文件源码
def setup_cache():
    global SESSION
    if get_setting_value('USE_CACHE'):
        import requests_cache
        requests_cache.install_cache(cache_name='gdc_cache', backend='sqlite', expire_after=18000)
    #   import cachecontrol
    #   from cachecontrol.caches import FileCache
    #   SESSION = cachecontrol.CacheControl(requests.Session(), cache=FileCache('.web_cache', forever=True))
    #else:
    #    SESSION = requests.Session()
项目:mygene.py    作者:biothings    | 项目源码 | 文件源码
def set_caching(self, cache_db='mygene_cache', verbose=True, **kwargs):
        ''' Installs a local cache for all requests.
            **cache_db** is the path to the local sqlite cache database.'''
        if caching_avail:
            requests_cache.install_cache(cache_name=cache_db, allowable_methods=('GET', 'POST'), **kwargs)
            self._cached = True
            if verbose:
                print('[ Future queries will be cached in "{0}" ]'.format(os.path.abspath(cache_db + '.sqlite')))
        else:
            print("Error: The requests_cache python module is required to use request caching.")
            print("See - https://requests-cache.readthedocs.io/en/latest/user_guide.html#installation")
        return
项目:resaspy    作者:ar90n    | 项目源码 | 文件源码
def __init__(self, key, version, cache_name=None, backend=None, **backend_options):
            self.__key = key
            self.__version = version
            self.__endpoint = 'https://opendata.resas-portal.go.jp'
            if cache_name is not None:
                requests_cache.install_cache(cache_name,
                                             backend,
                                             **backend_options)
项目:slackbot    作者:cybernetisk    | 项目源码 | 文件源码
def __init__(self):
        with open(os.path.join(os.path.dirname(__file__), 'cafeteria.json')) as f:
            self.cafeterias = json.load(f)
        self.url = 'https://sio.no/mat-og-drikke/_window/mat+og+drikke+-+dagens+middag?s={}'
        requests_cache.install_cache('sio', expire_after=360)
项目:slackbot    作者:cybernetisk    | 项目源码 | 文件源码
def get_from_api(url, params=None, encoding=None, cache=False, cachename='dafault',
                 cache_experation=60):
    """
    Common method to get infomration from a REST api that doesn't use authentication
    :param url: URL for the api
    :param params: the parameter for the request
    :param encoding: to override the endogind
    :param cache: Use cache(default False
    :param cachename: Name of the cache
    :param cache_experation: when do you want the cache to expire in seconds, default : 60

    :return:
    """
    response = requests.get(url, params=params)
    if cache:
        requests_cache.install_cache(cachename, expire_after=cache_experation)
    if response.encoding is None:
        if encoding is None:
            response.encoding = chardet.detect(response.raw.data)['encoding']
        else:
            response.encoding = encoding
    if response.status_code is not 200:
        raise Exception('%s:%s' % (response.status_code, response.text))
    try:
        return json.loads(response.text)
    except Exception as e:
        raise Exception('Can\'t parse the json string\n %s' % url)
项目:craigslist-rental-market    作者:brbsix    | 项目源码 | 文件源码
def run(self, cache=True):
        """Run application."""

        self._query()

        # configure `requests` cache
        if cache:
            cache_dir = appdirs.user_cache_dir('craigslist')
            os.makedirs(cache_dir, exist_ok=True)
            requests_cache.install_cache(
                cache_name=os.path.join(cache_dir, 'craigslist'),
                expire_after=timedelta(hours=0.5))

        print('Running query...\n')

        # record the start time
        start = time.time()

        self.prices = self._getprices()

        # determine elapsed time of queries
        self.duration = time.time() - start

        # remove expired cache entries
        if cache:
            requests_cache.core.remove_expired_responses()

        # print statistics (if any price data exists)
        if self.prices:
            self._print()
        else:
            print('Nothing found for that search.')
项目:Tenma    作者:Tenma-Server    | 项目源码 | 文件源码
def __init__(self):
        # Configure logging
        logging.getLogger("requests").setLevel(logging.WARNING)
        self.logger = logging.getLogger('tenma')

        # Setup requests caching
        requests_cache.install_cache('./media/CACHE/comicvine-cache', expire_after=1800)
        requests_cache.core.remove_expired_responses()

        # Set basic reusable strings
        self.api_key = Settings.get_solo().api_key
        self.directory_path = 'files'

        # API Strings
        self.baseurl = 'https://comicvine.gamespot.com/api/'
        self.imageurl = 'https://comicvine.gamespot.com/api/image/'
        self.base_params = { 'format': 'json', 'api_key': self.api_key }
        self.headers = { 'user-agent': 'tenma' }

        # API field strings
        self.arc_fields = 'deck,description,id,image,name,site_detail_url'
        self.character_fields = 'deck,description,id,image,name,site_detail_url'
        self.creator_fields = 'deck,description,id,image,name,site_detail_url'
        self.issue_fields = 'api_detail_url,character_credits,cover_date,deck,description,id,image,issue_number,name,person_credits,site_detail_url,story_arc_credits,team_credits,volume'
        self.publisher_fields = 'deck,description,id,image,name,site_detail_url'
        self.query_issue_fields ='cover_date,id,issue_number,name,volume'
        self.query_issue_limit = '100'
        self.series_fields = 'api_detail_url,deck,description,id,name,publisher,site_detail_url,start_year'
        self.team_fields = 'characters,deck,description,id,image,name,site_detail_url'

        # International reprint publishers
        # Ordered by # of issues (est.) for quick matching.
        self.int_pubs = [
            2350,   # Panini (21.5k)
            2812,   # Marvel UK (4.2k)
            2094,   # Abril (2.1k)
            2319,   # Planeta DeAgostini (2.1k)
            2903,   # Ediciones Zinco (0.7k)
            1133,   # Semic As (0.3k)
            2961,   # Marvel Italia (0.04k)
        ]

    #==================================================================================================
项目:gatk-cwl-generator    作者:wtsi-hgi    | 项目源码 | 文件源码
def main():
    parser = argparse.ArgumentParser(description='Generates CWL files from the GATK documentation')
    parser.add_argument("--version", "-v", dest='gatkversion', default="3.5",
        help="Sets the version of GATK to parse documentation for. Default is 3.5")
    parser.add_argument('--out', "-o", dest='outputdir',
        help="Sets the output directory for generated files. Default is ./gatk_cmdline_tools/<VERSION>/")
    parser.add_argument('--include', dest='include_file',
        help="Only generate this file (note, CommandLinkGATK has to be generated for v3.x)")
    parser.add_argument("--dev", dest="dev", action="store_true",
        help="Enable network caching and overwriting of the generated files (for development purposes). " + 
        "Requires requests_cache to be installed")
    parser.add_argument("--docker_container_name", "-c", dest="docker_container_name",
        help="Docker container name for generated cwl files. Default is 'broadinstitute/gatk3:<VERSION>' " + 
        "for version 3.x and 'broadinstitute/gatk:<VERSION>' for 4.x")
    parser.add_argument("--gatk_location", "-l", dest="gatk_location",
        help="Location of the gatk jar file. Default is '/usr/GenomeAnalysisTK.jar' for gatk 3.x and '/gatk/gatk.jar' for gatk 4.x")
    cmd_line_options = parser.parse_args()


    if cmd_line_options.dev:
        import requests_cache
        requests_cache.install_cache() # Decreases the time to run dramatically

    if not cmd_line_options.outputdir:
        cmd_line_options.outputdir = os.getcwd() + '/gatk_cmdline_tools/' + cmd_line_options.gatkversion

    if not cmd_line_options.docker_container_name:
        if is_version_3(cmd_line_options.gatkversion):
            cmd_line_options.docker_container_name = "broadinstitute/gatk3:" + cmd_line_options.gatkversion
        else:
            cmd_line_options.docker_container_name = "broadinstitute/gatk:" + cmd_line_options.gatkversion

    if not cmd_line_options.gatk_location:
        if is_version_3(cmd_line_options.gatkversion):
            cmd_line_options.gatk_location = "/usr/GenomeAnalysisTK.jar"
        else:
            cmd_line_options.gatk_location = "/gatk/gatk.jar"

    print("Your chosen directory is: %s" % cmd_line_options.outputdir)
    grouped_urls = get_json_links(cmd_line_options.gatkversion)

    generate_cwl_and_json_files(cmd_line_options.outputdir, grouped_urls, cmd_line_options)
项目:wswp    作者:kjam    | 项目源码 | 文件源码
def link_crawler(start_url, link_regex, robots_url=None, user_agent='wswp',
                 proxies=None, delay=3, max_depth=4, num_retries=2, expires=timedelta(days=30)):
    """ Crawl from the given start URL following links matched by link_regex. In the current
        implementation, we do not actually scrapy any information.

        args:
            start_url (str): web site to start crawl
            link_regex (str): regex to match for links
        kwargs:
            robots_url (str): url of the site's robots.txt (default: start_url + /robots.txt)
            user_agent (str): user agent (default: wswp)
            proxies (list of dicts): a list of possible dicts for http / https proxies
                For formatting, see the requests library
            delay (int): seconds to throttle between requests to one domain (default: 3)
            max_depth (int): maximum crawl depth (to avoid traps) (default: 4)
            num_retries (int): # of retries when 5xx error (default: 2)
            expires (timedelta): timedelta for cache expirations (default: 30 days)
    """
    crawl_queue = [start_url]
    # keep track which URL's have seen before
    seen = {}
    requests_cache.install_cache(backend='redis', expire_after=expires)
    if not robots_url:
        robots_url = '{}/robots.txt'.format(start_url)
    rp = get_robots_parser(robots_url)
    D = Downloader(delay=delay, user_agent=user_agent, proxies=proxies)
    while crawl_queue:
        url = crawl_queue.pop()
        # check url passes robots.txt restrictions
        if rp.can_fetch(user_agent, url):
            depth = seen.get(url, 0)
            if depth == max_depth:
                print('Skipping %s due to depth' % url)
                continue
            html = D(url, num_retries=num_retries)
            if not html:
                continue
            # TODO: add actual data scraping here
            # filter for links matching our regular expression
            for link in get_links(html):
                if re.match(link_regex, link):
                    abs_link = urljoin(start_url, link)
                    if abs_link not in seen:
                        seen[abs_link] = depth + 1
                        crawl_queue.append(abs_link)
        else:
            print('Blocked by robots.txt:', url)
项目:Python-Web-Scraping-Second-Edition    作者:PacktPublishing    | 项目源码 | 文件源码
def link_crawler(start_url, link_regex, robots_url=None, user_agent='wswp',
                 proxies=None, delay=3, max_depth=4, num_retries=2, expires=timedelta(days=30)):
    """ Crawl from the given start URL following links matched by link_regex. In the current
        implementation, we do not actually scrapy any information.

        args:
            start_url (str): web site to start crawl
            link_regex (str): regex to match for links
        kwargs:
            robots_url (str): url of the site's robots.txt (default: start_url + /robots.txt)
            user_agent (str): user agent (default: wswp)
            proxies (list of dicts): a list of possible dicts for http / https proxies
                For formatting, see the requests library
            delay (int): seconds to throttle between requests to one domain (default: 3)
            max_depth (int): maximum crawl depth (to avoid traps) (default: 4)
            num_retries (int): # of retries when 5xx error (default: 2)
            expires (timedelta): timedelta for cache expirations (default: 30 days)
    """
    crawl_queue = [start_url]
    # keep track which URL's have seen before
    seen = {}
    requests_cache.install_cache(backend='redis', expire_after=expires)
    if not robots_url:
        robots_url = '{}/robots.txt'.format(start_url)
    rp = get_robots_parser(robots_url)
    D = Downloader(delay=delay, user_agent=user_agent, proxies=proxies)
    while crawl_queue:
        url = crawl_queue.pop()
        # check url passes robots.txt restrictions
        if rp.can_fetch(user_agent, url):
            depth = seen.get(url, 0)
            if depth == max_depth:
                print('Skipping %s due to depth' % url)
                continue
            html = D(url, num_retries=num_retries)
            if not html:
                continue
            # TODO: add actual data scraping here
            # filter for links matching our regular expression
            for link in get_links(html):
                if re.match(link_regex, link):
                    abs_link = urljoin(start_url, link)
                    if abs_link not in seen:
                        seen[abs_link] = depth + 1
                        crawl_queue.append(abs_link)
        else:
            print('Blocked by robots.txt:', url)
项目:probe-scraper    作者:mozilla    | 项目源码 | 文件源码
def scrape(folder=None):
    """
    Returns data in the format:
    {
      node_id: {
        channels: [channel_name, ...],
        version: string,
        registries: {
          histogram: [path, ...]
          event: [path, ...]
          scalar: [path, ...]
        }
      },
      ...
    }
    """
    if folder is None:
        folder = tempfile.mkdtemp()
    error_cache = load_error_cache(folder)
    requests_cache.install_cache(os.path.join(folder, 'probe_scraper_cache'))
    results = defaultdict(dict)

    for channel in CHANNELS.iterkeys():
        tags = load_tags(channel)
        versions = extract_tag_data(tags, channel)
        save_error_cache(folder, error_cache)

        print "\n" + channel + " - extracted version data:"
        for v in versions:
            print "  " + str(v)

        print "\n" + channel + " - loading files:"
        for v in versions:
            print "  from: " + str(v)
            files = download_files(channel, v['node'], folder, error_cache)
            results[channel][v['node']] = {
                'channel': channel,
                'version': v['version'],
                'registries': files,
            }
            save_error_cache(folder, error_cache)

    return results