我们从Python开源项目中,提取了以下14个代码示例,用于说明如何使用vcr.VCR。
def make_vcr(): cassette_library_dir = os.path.join(os.path.dirname(__file__), 'fixtures', 'cassettes') return VCR(cassette_library_dir=cassette_library_dir, filter_query_parameters=[ 'key', 'oauth_consumer_key', 'oauth_nonce', 'oauth_signature_method', 'oauth_timestamp', 'oauth_token', 'oauth_version', 'oauth_signature', ], record_mode='new_episodes')
def vcr(self): """ Returns a new vcrpy instance. """ cassettes_dir = join(dirname(getfile(self.__class__)), 'cassettes') kwargs = { 'record_mode': getattr(self, 'vcrpy_record_mode', 'once'), 'cassette_library_dir': cassettes_dir, 'match_on': ['method', 'scheme', 'host', 'port', 'path', 'query'], 'filter_query_parameters': FILTER_QUERY_PARAMS, 'filter_post_data_parameters': FILTER_QUERY_PARAMS, 'before_record_response': IGittTestCase.remove_link_headers, 'filter_headers': ['Link'], } kwargs.update(self.vcr_options) return VCR(**kwargs)
def _post_recording_scrub(self): """ Perform post-recording cleanup on the YAML file that can't be accomplished with the VCR recording hooks. """ src_path = self.cassette_path rg_name = getattr(self, 'resource_group', None) rg_original = getattr(self, 'resource_group_original', None) t = tempfile.NamedTemporaryFile('r+') with open(src_path, 'r') as f: for line in f: # scrub resource group names if rg_original and rg_name != rg_original: line = line.replace(rg_name, rg_original) # omit bearer tokens if 'authorization:' not in line.lower(): t.write(line) t.seek(0) with open(src_path, 'w') as f: for line in t: f.write(line) t.close() # COMMAND METHODS
def _post_recording_scrub(self): """ Perform post-recording cleanup on the YAML file that can't be accomplished with the VCR recording hooks. """ src_path = self.cassette_path rg_name = getattr(self, 'resource_group', None) rg_original = getattr(self, 'resource_group_original', None) t = tempfile.NamedTemporaryFile('r+') with open(src_path, 'r') as f: for line in f: # scrub resource group names if rg_name != rg_original: line = line.replace(rg_name, rg_original) # omit bearer tokens if 'authorization:' not in line.lower(): t.write(line) t.seek(0) with open(src_path, 'w') as f: for line in t: f.write(line) t.close() # COMMAND METHODS
def __init__(self, # pylint: disable=too-many-arguments method_name, config_file=None, recording_dir=None, recording_name=None, recording_processors=None, replay_processors=None, recording_patches=None, replay_patches=None): super(ReplayableTest, self).__init__(method_name) self.recording_processors = recording_processors or [] self.replay_processors = replay_processors or [] self.recording_patches = recording_patches or [] self.replay_patches = replay_patches or [] self.config = TestConfig(config_file=config_file) self.disable_recording = False test_file_path = inspect.getfile(self.__class__) recording_dir = recording_dir or os.path.join(os.path.dirname(test_file_path), 'recordings') self.is_live = self.config.record_mode self.vcr = vcr.VCR( cassette_library_dir=recording_dir, before_record_request=self._process_request_recording, before_record_response=self._process_response_recording, decode_compressed_response=True, record_mode='once' if not self.is_live else 'all', filter_headers=self.FILTER_HEADERS ) self.vcr.register_matcher('query', self._custom_request_query_matcher) self.recording_file = os.path.join( recording_dir, '{}.yaml'.format(recording_name or method_name) ) if self.is_live and os.path.exists(self.recording_file): os.remove(self.recording_file) self.in_recording = self.is_live or not os.path.exists(self.recording_file) self.test_resources_count = 0 self.original_env = os.environ.copy()
def pytest_configure(config): # register the online marker config.addinivalue_line('markers', 'online: mark a test that goes online. VCR will automatically be used.')
def _get_vcr(self): testdir = os.path.dirname(inspect.getfile(self.__class__)) cassettes_dir = os.path.join(testdir, 'cassettes') return vcr.VCR( record_mode=self.vcrpy_record_mode, cassette_library_dir=cassettes_dir, match_on=['method', 'scheme', 'host', 'port', 'path'], filter_query_parameters=FILTER_QUERY_PARAMS, filter_post_data_parameters=FILTER_QUERY_PARAMS, before_record_response=GitmateTestCase.remove_link_headers)
def get_vcr(filepath): return vcr.VCR( path_transformer=vcr.VCR.ensure_suffix('.yaml'), match_on=("method", "scheme", "host", "port", "path", "query", "body"), cassette_library_dir=os.path.splitext(filepath)[0] )
def __init__(self, test_file, test_name, run_live=False, debug=False, debug_vcr=False, skip_setup=False, skip_teardown=False): super(VCRTestBase, self).__init__(test_name) self.test_name = test_name self.recording_dir = find_recording_dir(test_file) self.cassette_path = os.path.join(self.recording_dir, '{}.yaml'.format(test_name)) self.playback = os.path.isfile(self.cassette_path) if os.environ.get(ENV_LIVE_TEST, None) == 'True': self.run_live = True else: self.run_live = run_live self.skip_setup = skip_setup self.skip_teardown = skip_teardown self.success = False self.exception = None self.track_commands = os.environ.get(COMMAND_COVERAGE_CONTROL_ENV, None) self._debug = debug if not self.playback and ('--buffer' in sys.argv) and not run_live: self.exception = CLIError('No recorded result provided for {}.'.format(self.test_name)) if debug_vcr: logging.basicConfig() vcr_log = logging.getLogger('vcr') vcr_log.setLevel(logging.INFO) self.my_vcr = vcr.VCR( cassette_library_dir=self.recording_dir, before_record_request=self._before_record_request, before_record_response=self._before_record_response, decode_compressed_response=True ) self.my_vcr.register_matcher('custom', _custom_request_matcher) self.my_vcr.match_on = ['custom']
def __init__(self, cli, method_name, filter_headers=None): super(ScenarioTest, self).__init__(cli, method_name) self.name_replacer = GeneralNameReplacer() self.recording_processors = [LargeRequestBodyProcessor(), LargeResponseBodyProcessor(), self.name_replacer] self.replay_processors = [LargeResponseBodyReplacer()] self.filter_headers = filter_headers or [] test_file_path = inspect.getfile(self.__class__) recordings_dir = find_recording_dir(test_file_path) live_test = os.environ.get(ENV_LIVE_TEST, None) == 'True' self.vcr = vcr.VCR( cassette_library_dir=recordings_dir, before_record_request=self._process_request_recording, before_record_response=self._process_response_recording, decode_compressed_response=True, record_mode='once' if not live_test else 'all', filter_headers=self.filter_headers ) self.vcr.register_matcher('query', self._custom_request_query_matcher) self.recording_file = os.path.join(recordings_dir, '{}.yaml'.format(method_name)) if live_test and os.path.exists(self.recording_file): os.remove(self.recording_file) self.in_recording = live_test or not os.path.exists(self.recording_file) self.test_resources_count = 0 self.original_env = os.environ.copy()
def __init__(self, test_file, test_name, run_live=False, debug=False, debug_vcr=False, skip_setup=False, skip_teardown=False): super(VCRTestBase, self).__init__(test_name) self.test_name = test_name self.recording_dir = os.path.join(os.path.dirname(test_file), 'recordings') self.cassette_path = os.path.join(self.recording_dir, '{}.yaml'.format(test_name)) self.playback = os.path.isfile(self.cassette_path) if os.environ.get(LIVE_TEST_CONTROL_ENV, None) == 'True': self.run_live = True else: self.run_live = run_live self.skip_setup = skip_setup self.skip_teardown = skip_teardown self.success = False self.exception = None self.track_commands = os.environ.get(COMMAND_COVERAGE_CONTROL_ENV, None) self._debug = debug if not self.playback and ('--buffer' in sys.argv) and not run_live: self.exception = CLIError('No recorded result provided for {}.'.format(self.test_name)) if debug_vcr: import logging logging.basicConfig() vcr_log = logging.getLogger('vcr') vcr_log.setLevel(logging.INFO) self.my_vcr = vcr.VCR( cassette_library_dir=self.recording_dir, before_record_request=self._before_record_request, before_record_response=self._before_record_response, decode_compressed_response=True ) self.my_vcr.register_matcher('custom', _custom_request_matcher) self.my_vcr.match_on = ['custom']
def record_requests(request): # vcr interceptor global REQUESTS global REQUESTID if type(request) == dict: thistype = 'response' else: REQUESTID += 1 thistype = 'request' if thistype == 'request': ydata = {} ydata['request'] = {} ydata['request']['method'] = request.method ydata['request']['uri'] = request.uri if request.headers: ydata['request']['headers'] = {} for x in request.headers.items(): ydata['request']['headers'][x[0]] = x[1] else: ydata['request']['headers'] = request.headers ydata['request']['body'] = literal(pretty_xml(request.body)) REQUESTS.append(ydata) else: ydata = REQUESTS[-1] ydata['response'] = request.copy() REQUESTS[-1]['response'] = request.copy() REQUESTS[-1]['response']['body'] = literal(pretty_xml(request['body']['string'])) fid = str(REQUESTID).rjust(4, '0') fname = os.path.join('/tmp', 'fixtures', '%s.yml' % (fid)) with open(fname, 'wb') as f: yaml.dump(REQUESTS[-1], f, width=1000, indent=2) return request # Make a custom VCR instance so we can inject our spyware into it
def _run(self, args, working_dir, output_filename, request_json_builder = None, live_request_json_builder = None): ''' Runs the test given the command line arguments, and the output filename. :type args: list(str) representing the components of the command line argument. :c output_filename: str represents the filename of the file to write the command line output to. ''' self.logger.info("%s" % (' '.join(args))) self._reset_output_file(output_filename) def match_um_requests_on(r1, r2): ''' Custom uri matcher for use with vcrpy. Basically it provides custom matching for user and action UM requests, which ignores the org ID portion of the request. Otherwise, the request uri must match exactly. :param r1: :param r2: :return: ''' if re.match(self.test_suite_config['users_request_matcher'], r1.uri): if re.match(self.test_suite_config['users_request_matcher'], r2.uri): return True if re.match(self.test_suite_config['actions_request_matcher'], r2.uri): if re.match(self.test_suite_config['actions_request_matcher'], r2.uri): return True return r1.uri == r2.uri record_mode = 'all' if self.server_config['live_mode'] else 'none' mock_spec = 'proxy' if self.server_config['live_mode'] else 'playback' recorder = vcr.VCR( record_mode=record_mode, match_on=['um_request'], # match_on=match_um_requests_on, decode_compressed_response=True, filter_headers=['authorization'] ) recorder.register_matcher('um_request',match_um_requests_on) with recorder.use_cassette(self.server_config['cassette_filename']) as cassette: service = server.TestService(self.server_config, cassette, request_json_builder) service.run() with open(output_filename, 'w') as output_file: subprocess.call(args, cwd=working_dir, env=dict(os.environ, UMAPI_MOCK=mock_spec), shell=IS_NT_PLATFORM, stdin=None, stdout=output_file, stderr=output_file) # p = subprocess.Popen(args, cwd=working_dir, stdout=output_file, stderr=output_file) # output_bytes = subprocess.check_output(cmd, cwd=working_dir, shell=True) # output_file.write(output_bytes.decode()) # p.communicate() service.stop() if live_request_json_builder: for stored_request, stored_response in cassette.data: if stored_request.body: live_request_json_builder.extend_with_json_string(stored_request.body)
def use_vcr(request, monkeypatch): """ This fixture is applied automatically to any test using the `online` mark. It will record and playback network sessions using VCR. The record mode of VCR can be set using the VCR_RECORD_MODE environment variable when running tests. """ if VCR_RECORD_MODE == 'off': yield None else: path_segments = [VCR_CASSETTE_DIR] if request.module is not None: path_segments.extend(request.module.__name__.split('tests.')[-1].split('.')) if request.cls is not None: path_segments.append(request.cls.__name__) # Take into account the parametrization set by pytest # to create many tests from a single function with many parameters. request_keywords = request.keywords.keys() if 'parametrize' in request_keywords: try: param_name = [x for x in request_keywords if x.startswith('_with_')][0] except IndexError: param_name = '' else: param_name = '' path_segments.append(request.function.__name__ + param_name + '.yaml') cassette_path = os.path.join(*path_segments) filter_query = [('applicationId', 'XXXXXX')] filter_headers = [('Authorization', 'ESA XXXXXX')] filter_post = [] online = True if vcr.record_mode == 'none': online = False elif vcr.record_mode == 'once': online = not os.path.exists(cassette_path) # If we are going online, check the credentials if online: if os.environ.get("RAKUTEN_APP_ID", None) is None: pytest.skip('need credentials to run this test') with vcr.use_cassette(path=cassette_path, filter_query_parameters=filter_query, filter_headers=filter_headers, filter_post_data_parameters=filter_post, decode_compressed_response=True) as cassette: yield cassette