我们从Python开源项目中,提取了以下45个代码示例,用于说明如何使用responses.RequestsMock()。
def test_downloader_conn_error(self): exception = ConnectionError() with responses.RequestsMock(assert_all_requests_are_fired=True) as rsps: max_retry = 3 for _ in range(max_retry + 1): rsps.add(responses.GET, self.TEST_MPD_URL, body=exception) dl = live.Downloader( mpd=self.TEST_MPD_URL, output_dir='output_connerror', duplicate_etag_retry=2, singlethreaded=True, max_connection_error_retry=max_retry) dl.run() dl.stream_id = '17875351285037717' output_file = 'output_connerror.mp4' dl.stitch(output_file, cleartempfiles=True) self.assertFalse(os.path.isfile(output_file), '{0!s} not generated'.format(output_file))
def setup_versions(self, stable, devel, nightly, esr, esr_next=None): """ Add a mock response for official versions """ self.cleanup() body = { "FIREFOX_NIGHTLY": nightly, "FIREFOX_ESR": esr, "FIREFOX_ESR_NEXT": esr_next, "LATEST_FIREFOX_DEVEL_VERSION": devel, "LATEST_FIREFOX_OLDER_VERSION": "3.6.28", "LATEST_FIREFOX_RELEASED_DEVEL_VERSION": devel, "LATEST_FIREFOX_VERSION": stable, } local_mock = responses.RequestsMock() local_mock.add(responses.GET, versions.URL_VERSIONS, json=body) local_mock.start() yield local_mock local_mock.stop()
def test_create_new_user(self): with responses.RequestsMock() as rsps: rsps.add( self.deform_client.user.create.method.upper(), self.deform_client.user.create.get_context({})['url'], json={ 'message': 'Check your email for confirmation code' }, status=201 ) response = self.deform_client.user.create( email='a' + self.CONFIG['DEFORM']['EMAIL'], password=self.CONFIG['DEFORM']['PASSWORD'] ) assert_that( response, has_entry('message', 'Check your email for confirmation code') )
def test_confirm(self): session_id = 'Qivjy3PW3dqxAOY9i4MeT1H0FHyGkw' with responses.RequestsMock() as rsps: rsps.add( self.deform_client.user.confirm.method.upper(), self.deform_client.user.confirm.get_context({})['url'], json={ 'sessionId': session_id }, status=200 ) response = self.deform_client.user.confirm( code='12345' ) assert_that( response, has_entry('sessionId', session_id) )
def test_confirm_already_confirmed_code(self): with responses.RequestsMock() as rsps: rsps.add( self.deform_client.user.confirm.method.upper(), self.deform_client.user.confirm.get_context({})['url'], json={ 'message': 'User already exists.' }, status=409 ) assert_that( calling(self.deform_client.user.confirm).with_args( code='12345' ), raises(ConflictError, '^User already exists\.$') )
def test_all_the_things(self, core_client, mgmt_server_base_uri, resource, params): endpoint = mgmt_server_base_uri + resource with responses.RequestsMock() as rsps: resp_body = {'foo': 'bar', 'message': 'OK'} rsps.add(responses.POST, endpoint, json=resp_body, status=200, content_type='application/json') lm = cpauto.LoginMessage(core_client) if resource == "show-login-message": r = lm.show() assert r.status_code == 200 assert r.json() == resp_body if resource == "set-login-message": r = lm.set(params=params) assert r.status_code == 200 assert r.json() == resp_body
def test_show_all_access_layers(core_client, mgmt_server_base_uri, limit, offset, order, details_level): endpoint = mgmt_server_base_uri + 'show-access-layers' with responses.RequestsMock() as rsps: resp_body = {'foo': 'bar', 'message': 'OK'} rsps.add(responses.POST, endpoint, json=resp_body, status=200, content_type='application/json') c = cpauto.AccessLayer(core_client) r = c.show_all(limit=limit, offset=offset, order=order, details_level=details_level) assert r.status_code == 200 assert r.json() == resp_body # NATRule
def test_preferred_object(self): with open('tests/rets_responses/GetObject_multipart.byte', 'rb') as f: multiple = f.read() multi_headers = { 'Content-Type': 'multipart/parallel; boundary="24cbd0e0afd2589bb9dcb1f34cf19862"; charset=utf-8', 'Connection': 'keep-alive', 'RETS-Version': 'RETS/1.7.2', 'MIME-Version': '1.0, 1.0'} with responses.RequestsMock() as resps: resps.add(resps.POST, 'http://server.rets.com/rets/GetObject.ashx', body=multiple, status=200, headers=multi_headers) obj = self.session.get_preferred_object(resource='Property', object_type='Photo', content_id=1) self.assertTrue(obj) resps.add(resps.POST, 'http://server.rets.com/rets/GetObject.ashx', body=multiple, status=200) resource = dict() resource['ResourceID'] = 'Agent' obj1 = self.session.get_preferred_object(resource=resource, object_type='Photo', content_id=1) self.assertTrue(obj1)
def test_class_metadata(self): with open('tests/rets_responses/COMPACT-DECODED/GetMetadata_classes.xml') as f: contents = ''.join(f.readlines()) with open('tests/rets_responses/COMPACT-DECODED/GetMetadata_classes_single.xml') as f: single_contents = ''.join(f.readlines()) with responses.RequestsMock() as resps: resps.add(resps.POST, 'http://server.rets.com/rets/GetMetadata.ashx', body=contents, status=200) resource_classes = self.session.get_class_metadata(resource='Agent') self.assertEqual(len(resource_classes), 6) resps.add(resps.POST, 'http://server.rets.com/rets/GetMetadata.ashx', body=single_contents, status=200) resource_classes_single = self.session.get_class_metadata(resource='Property') self.assertEqual(len(resource_classes_single), 1)
def test_auto_offset(self): with open('tests/rets_responses/COMPACT-DECODED/Search_1of2.xml') as f: search1_contents = ''.join(f.readlines()) with open('tests/rets_responses/COMPACT-DECODED/Search_2of2.xml') as f: search2_contents = ''.join(f.readlines()) with responses.RequestsMock() as resps: resps.add(resps.POST, 'http://server.rets.com/rets/Search.ashx', body=search1_contents, status=200, stream=True) resps.add(resps.POST, 'http://server.rets.com/rets/Search.ashx', body=search2_contents, status=200, stream=True) results = self.session.search(resource='Property', resource_class='RES', search_filter={'ListingPrice': 200000}) self.assertEqual(len(results), 6)
def test_change_parser_automatically(self): self.assertEqual(self.session.metadata_format, 'COMPACT-DECODED') with open('tests/rets_responses/Errors/20514.xml') as f: dtd_error = ''.join(f.readlines()) with open('tests/rets_responses/STANDARD-XML/GetMetadata_system.xml') as f: content = ''.join(f.readlines()) with responses.RequestsMock() as resps: resps.add(resps.POST, 'http://server.rets.com/rets/GetMetadata.ashx', body=dtd_error, status=200) resps.add(resps.POST, 'http://server.rets.com/rets/GetMetadata.ashx', body=content, status=200) self.session.get_system_metadata() self.assertEqual(self.session.metadata_format, 'STANDARD-XML')
def test_request_renew_auth_token_success(self, auth_mock): auth_mock.request_new_access_token.return_value = "access_token" with responses.RequestsMock( assert_all_requests_are_fired=True) as rsps: rsps.add( responses.GET, 'https://hostname.com/v1/this/1', body="foo", status=401 ) rsps.add( responses.GET, 'https://hostname.com/v1/this/1', body="bar", status=200 ) session = self.session response = session.get(self.request_url) headers = dict(response.request.headers) assert response.text == "bar" assert all(value == headers[key] for key, value in self.exp_headers.items())
def test_request_renew_auth_token_fail(self, auth_mock): auth_mock.request_new_access_token.return_value = "access_token" with responses.RequestsMock( assert_all_requests_are_fired=True) as rsps: rsps.add( responses.GET, 'https://hostname.com/v1/this/1', body="bar", status=401 ) rsps.add( responses.GET, 'https://hostname.com/v1/this/1', body="foo", status=401 ) session = self.session response = session.get(self.request_url) assert response.text == "foo"
def test_no_hooks(self, auth_mock): auth_mock.request_new_access_token.return_value = "access_token" with responses.RequestsMock( assert_all_requests_are_fired=True) as rsps: rsps.add( responses.GET, 'https://hostname.com/v1/this/1', body="foo", status=401 ) rsps.add( responses.GET, 'https://hostname.com/v1/this/1', body="bar", status=200 ) session = self.session hook_mock = mock.Mock() # no hooks session.get(self.request_url) assert_not_called(hook_mock.pre_hook) assert_not_called(hook_mock.post_hook)
def test_pre_hook(self, auth_mock): auth_mock.request_new_access_token.return_value = "access_token" with responses.RequestsMock( assert_all_requests_are_fired=True) as rsps: rsps.add( responses.GET, 'https://hostname.com/v1/this/1', body="foo", status=401 ) rsps.add( responses.GET, 'https://hostname.com/v1/this/1', body="bar", status=200 ) session = self.session hook_mock = mock.Mock() session.register_pre_hook(hook_mock.pre_hook) session.get(self.request_url) assert_called_once(hook_mock.pre_hook) assert_not_called(hook_mock.post_hook)
def test_pre_and_post_hooks(self, auth_mock): auth_mock.request_new_access_token.return_value = "access_token" with responses.RequestsMock( assert_all_requests_are_fired=True) as rsps: rsps.add( responses.GET, 'https://hostname.com/v1/this/1', body="foo", status=401 ) rsps.add( responses.GET, 'https://hostname.com/v1/this/1', body="bar", status=200 ) session = self.session hook_mock = mock.Mock() session.register_pre_hook(hook_mock.pre_hook) session.register_post_hook(hook_mock.post_hook) session.get(self.request_url) assert_called_once(hook_mock.pre_hook) assert_called_once(hook_mock.post_hook)
def with_mock_responses(req_resp=None): def decorator(f): @functools.wraps(f) def wrapper(*args, **kwargs): class_name, func_name = re.sub(r'([A-Z]+)', r'_\1', args[0].__class__.__name__).lower().strip('_'), f.__name__ fixtures = load_reqresp_fixture(req_resp or "{}/{}".format(class_name, func_name)) with responses.RequestsMock(assert_all_requests_are_fired=False) as rsps: for fixture in fixtures: if 'url_re' in fixture: url_re = fixture.pop('url_re') fixture['url'] = re.compile(Client.build_url(url_re)) else: fixture['url'] = Client.build_url(fixture['url']) if "content_type" in fixture and fixture['content_type'] == "application/json": fixture['body'] = json.dumps(fixture['body']) rsps.add(**fixture) return f(responses=rsps, *args, **kwargs) return wrapper return decorator
def test_modpack_fetch(minimal_pack, tinkers_update): """Does the File.fetch fetches the file correctly?""" minimal_pack.path /= 'files' session = requests.Session() file = tinkers_update with betamax.Betamax(session).use_cassette('file-fetch'): with pytest.raises(OSError): minimal_pack.fetch(file, session=session) minimal_pack.path.mkdir() minimal_pack.fetch(file, session=session) filepath = minimal_pack.path / file.name assert filepath.is_file() assert filepath.stat().st_mtime == file.date.timestamp() with responses.RequestsMock() as rsps: minimal_pack.fetch(file, session=session) assert len(rsps.calls) == 0
def setup(self, rsps: RequestsMock): for package_name in self.package_names: response = [ {"ref": "refs/tags/{}".format(package['version'])} for package in mock_package_list if package['name'] == package_name ] github_name = package_name if '/' not in github_name: github_name = package_name + "/" + package_name rsps.add(rsps.GET, Github._REFS_URL.format(github_name), json=response) re_notfound = re.compile(escape_regex_format_str(Github._REFS_URL).format("([a-zA-Z-_.]+)")) for _ in range(self._expected_missing): rsps.add(rsps.GET, re_notfound, status=404)
def test_parameterized_queries(self, type, query, parameters, expected, dw, dataset_key, query_response_json): with responses.RequestsMock() as rsps: def request_callback(request): for value in expected: assert_that(request.url, contains_string(value), reason="Expected [[\n{}\n]] to contain " "[[\n{}\n]]".format(request.url, expected)) return(200, {}, json.dumps(query_response_json)) rsps.add_callback(rsps.GET, re.compile( r'https?://query\.data\.world/.*'), callback=request_callback, content_type="application/json", match_querystring=True) dw.query(dataset_key, query, query_type=type, parameters=parameters)
def test_write_basic(self): with responses.RequestsMock() as resp: def upload_endpoint(request): assert "test" == ''.join([chunk.decode('utf-8') for chunk in request.body]) assert request.headers.get('User-Agent') == _user_agent() return 200, {}, json.dumps({}) resp.add_callback(resp.PUT, '{}/uploads/{}/{}/files/{}' .format('https://api.data.world/v0', 'user', 'dataset', 'file.txt'), callback=upload_endpoint) with RemoteFile(DefaultConfig(), "user/dataset", "file.txt") as writer: writer.write("test")
def test_write_csv(self): with responses.RequestsMock() as resp: def upload_endpoint(request): assert "a,b\r\n42,17\r\n420,178\r\n" == \ ''.join([chunk.decode('utf-8') for chunk in request.body]) assert request.headers.get('User-Agent') == _user_agent() return 200, {}, json.dumps({}) resp.add_callback(resp.PUT, '{}/uploads/{}/{}/files/{}' .format('https://api.data.world/v0', 'user', 'dataset', 'file.csv'), callback=upload_endpoint) with RemoteFile(DefaultConfig(), "user/dataset", "file.csv") as writer: csvw = csv.DictWriter(writer, fieldnames=['a', 'b']) csvw.writeheader() csvw.writerow({'a': 42, 'b': 17}) csvw.writerow({'a': 420, 'b': 178})
def test_read_jsonl(self): with responses.RequestsMock() as resp: def download_endpoint(request): assert request.headers.get('User-Agent') == _user_agent() return 200, {}, '{"A":"1", "B":"2", "C":"3"}\n' \ '{"A":"4", "B":"5", "C":"6"}\n' resp.add_callback(resp.GET, '{}/file_download/{}/{}/{}' .format('https://query.data.world', 'user', 'dataset', 'file.csv'), callback=download_endpoint) with RemoteFile(DefaultConfig(), "user/dataset", "file.csv", mode="r") as reader: rows = [json.loads(line) for line in reader if line.strip()] assert rows[0] == {'A': '1', 'B': '2', 'C': '3'} assert rows[1] == {'A': '4', 'B': '5', 'C': '6'}
def test_endpoint_all(self): with responses.RequestsMock() as rsps: rsps.add(responses.GET, MOCK_API_URL + '/users', json=[ { 'id': 1, 'username': 'user1', 'group': 'watchers', }, { 'id': 2, 'username': 'user2', 'group': 'watchers', }, { 'id': 3, 'username': 'user3', 'group': 'admin', }, ]) users = generic_client.users.all() self.assertEqual(len(users), 3)
def test_endpoint_links(self): with responses.RequestsMock() as rsps: rsps.add('GET', MOCK_API_URL + '/users?page=2', json=[ { 'id': 3, 'username': 'user1', 'group': 'watchers', }, { 'id': 4, 'username': 'user2', 'group': 'watchers', }, ], match_querystring=True, headers={ 'Link': '<http://example.com/users?page=3>; rel=next,<http://example.com/users?page=1>; rel=previous' }) users = generic_client.users.filter(page=2) self.assertEqual(users.response.links, { 'next': {'url': 'http://example.com/users?page=3', 'rel': 'next'}, 'previous': {'url': 'http://example.com/users?page=1', 'rel': 'previous'} })
def test_endpoint_delete(self): with responses.RequestsMock() as rsps: rsps.add(responses.DELETE, MOCK_API_URL + '/users/1', status=204) generic_client.users.delete(1) with responses.RequestsMock() as rsps: rsps.add(responses.DELETE, MOCK_API_URL + '/users/1', status=404) with self.assertRaises(generic_client.ResourceNotFound): generic_client.users.delete(1) with responses.RequestsMock() as rsps: rsps.add(responses.DELETE, MOCK_API_URL + '/users/1', status=500) with self.assertRaises(generic_client.HTTPError): generic_client.users.delete(1)
def test_resource_save_uuid(self): with responses.RequestsMock() as rsps: rsps.add(responses.GET, MOCK_API_URL + '/users/1', json={ 'uuid': 1, 'username': 'user1', 'group': 'watchers', }) user1 = generic_client.users.get(uuid=1) self.assertEqual(user1.username, 'user1') with responses.RequestsMock() as rsps: rsps.add(responses.PUT, MOCK_API_URL + '/users/1', json={ 'uuid': 1, 'username': 'user1', 'group': 'admins', }) user1.group = 'admins' user1.save()
def test_resource_save(self): with responses.RequestsMock() as rsps: rsps.add(responses.GET, MOCK_API_URL + '/users/1/', json={ 'id': 1, 'username': 'user1', 'group': 'watchers', }) user1 = generic_client.users.get(id=1) self.assertEqual(user1.username, 'user1') with responses.RequestsMock() as rsps: rsps.add(responses.PUT, MOCK_API_URL + '/users/1/', json={ 'id': 1, 'username': 'user1', 'group': 'admins', }) user1.group = 'admins' user1.save()
def test_endpoint_detail_route(self): with responses.RequestsMock() as rsps: rsps.add_callback( responses.POST, MOCK_API_URL + '/users/2/notify', callback=request_callback, content_type='application/json', ) response = generic_client.users(id=2).notify(unread=3) self.assertEqual(response.json(), {'unread': 3}) with responses.RequestsMock() as rsps: rsps.add_callback( responses.GET, MOCK_API_URL + '/users/2/notify', callback=request_callback, content_type='application/json', ) response = generic_client.users(_method='get', id=2).notify(unread=3) self.assertEqual(response.json(), {'unread': 3})
def test_endpoint_list_route(self): with responses.RequestsMock() as rsps: rsps.add_callback( responses.POST, MOCK_API_URL + '/users/notify', callback=request_callback, content_type='application/json', ) response = generic_client.users().notify(unread=3) self.assertEqual(response.json(), {'unread': 3}) with responses.RequestsMock() as rsps: rsps.add_callback( responses.GET, MOCK_API_URL + '/users/notify', callback=request_callback, content_type='application/json', ) response = generic_client.users(_method='get').notify(unread=3) self.assertEqual(response.json(), {'unread': 3})
def test_endpoint_links(self): with responses.RequestsMock() as rsps: rsps.add('GET', MOCK_API_URL + '/users/?page=2', json=[ { 'id': 3, 'username': 'user1', 'group': 'watchers', }, { 'id': 4, 'username': 'user2', 'group': 'watchers', }, ], match_querystring=True, headers={ 'Link': '<http://example.com/users/?page=3>; rel=next,<http://example.com/users/?page=1>; rel=previous' }) users = generic_client.users.filter(page=2) self.assertEqual(users.response.links, { 'next': {'url': 'http://example.com/users/?page=3', 'rel': 'next'}, 'previous': {'url': 'http://example.com/users/?page=1', 'rel': 'previous'} })
def test_failure_page(self): processor_id = '3' with responses.RequestsMock() as rsps: rsps.add(rsps.GET, api_url('/payments/'), json={ 'uuid': 'wargle-blargle', 'processor_id': processor_id, 'recipient_name': 'James Bond', 'amount': 2000, 'status': 'pending', 'created': datetime.datetime.now().isoformat() + 'Z', 'prisoner_number': 'A5544CD', 'prisoner_dob': '1992-12-05' }) rsps.add(rsps.GET, govuk_url('/payments/%s' % processor_id), json={ 'state': {'status': 'failed'} }) self.driver.get(self.live_server_url + '/en-gb/debit-card/confirmation/?payment_ref=REF12345') self.assertInSource('We’re sorry, your payment could not be processed on this occasion') self.assertInSource('Your reference number is <strong>REF12345</strong>')
def assertFormValid(self, form): # noqa with responses.RequestsMock() as rsps, \ mock.patch('send_money.forms.PrisonerDetailsForm.get_api_session') as mocked_api_session: mocked_api_session.side_effect = get_api_session mock_auth(rsps) rsps.add( rsps.GET, api_url('/prisoner_validity/'), json={ 'count': 1, 'results': [{ 'prisoner_number': 'A1234AB', 'prisoner_dob': '1980-10-05', }], }, status=200, ) self.assertTrue(form.is_valid(), msg='\n\n%s' % form.errors.as_text())
def test_update_incomplete_payments_no_govuk_payment_found(self): with responses.RequestsMock() as rsps: mock_auth(rsps) rsps.add( rsps.GET, api_url('/payments/'), json=self.payment_data, status=200, ) rsps.add( rsps.GET, govuk_url('/payments/%s/' % self.processor_id), status=404 ) rsps.add( rsps.PATCH, api_url('/payments/%s/' % self.ref), status=200, ) call_command('update_incomplete_payments') self.assertEqual(rsps.calls[3].request.body.decode(), '{"status": "failed"}')
def test_search_not_limited_to_specific_prisons(self, mocked_api_session): mocked_api_session.side_effect = get_api_session self.choose_bank_transfer_payment_method() with responses.RequestsMock() as rsps: mock_auth(rsps) rsps.add( rsps.GET, api_url('/prisoner_validity/') + '?prisoner_number=A1231DE' '&prisoner_dob=1980-10-04', match_querystring=True, json={ 'count': 0, 'results': [] }, ) self.client.post(self.url, data={ 'prisoner_number': 'A1231DE', 'prisoner_dob_0': '4', 'prisoner_dob_1': '10', 'prisoner_dob_2': '1980', }, follow=True)
def test_can_limit_search_to_specific_prisons(self, mocked_api_session): mocked_api_session.side_effect = get_api_session self.choose_bank_transfer_payment_method() with responses.RequestsMock() as rsps: mock_auth(rsps) rsps.add( rsps.GET, api_url('/prisoner_validity/') + '?prisoner_number=A1231DE' '&prisoner_dob=1980-10-04' '&prisons=ABC,DEF', match_querystring=True, json={ 'count': 0, 'results': [] }, ) self.client.post(self.url, data={ 'prisoner_number': 'A1231DE', 'prisoner_dob_0': '4', 'prisoner_dob_1': '10', 'prisoner_dob_2': '1980', }, follow=True)
def test_search_not_limited_to_specific_prisons(self, mocked_api_session): mocked_api_session.side_effect = get_api_session self.choose_debit_card_payment_method() with responses.RequestsMock() as rsps: mock_auth(rsps) rsps.add( rsps.GET, api_url('/prisoner_validity/') + '?prisoner_number=A1231DE' '&prisoner_dob=1980-10-04', match_querystring=True, json={ 'count': 0, 'results': [] }, ) self.client.post(self.url, data={ 'prisoner_name': 'john smith', 'prisoner_number': 'A1231DE', 'prisoner_dob_0': '4', 'prisoner_dob_1': '10', 'prisoner_dob_2': '1980', }, follow=True)
def test_debit_card_payment_handles_govuk_errors(self): self.choose_debit_card_payment_method() self.fill_in_prisoner_details() self.fill_in_amount() with responses.RequestsMock() as rsps: mock_auth(rsps) rsps.add( rsps.POST, api_url('/payments/'), json={'uuid': 'wargle-blargle'}, status=201, ) rsps.add( rsps.POST, govuk_url('/payments/'), status=500 ) with self.patch_prisoner_details_check(), silence_logger(): response = self.client.get(self.url, follow=False) self.assertContains(response, 'We’re sorry, your payment could not be processed on this occasion')
def test_confirmation_handles_api_update_errors(self): self.choose_debit_card_payment_method() self.fill_in_prisoner_details() self.fill_in_amount() with responses.RequestsMock() as rsps: mock_auth(rsps) rsps.add( rsps.GET, api_url('/payments/%s/' % self.ref), status=500, ) with self.patch_prisoner_details_check(), silence_logger(): response = self.client.get( self.url, {'payment_ref': self.ref}, follow=False ) self.assertContains(response, 'your payment could not be processed') self.assertContains(response, self.ref[:8].upper()) # check session is cleared for key in self.complete_session_keys: self.assertNotIn(key, self.client.session)
def response_mocker(kwargs, base_url, endpoint_url, status=200, content_type='application/json', post=False, data=None): """ Generates a mocked requests response for a given set of kwargs, base url and endpoint url """ url = re.sub('\{\{(?P<m>[a-zA-Z_]+)\}\}', lambda m: "%s" % kwargs.get(m.group(1)), base_url + endpoint_url) with responses.RequestsMock() as rsps: if post: rsps.add(responses.POST, url, body=b'{"data": "some json formatted output"}', status=status, content_type='application/json') response = requests.post(url, data=data) elif content_type == 'application/json': rsps.add(responses.GET, url, body=b'{"data": "some json formatted output"}', status=status, content_type='application/json') response = requests.get(url) elif content_type == 'text/plain': rsps.add(responses.GET, url, body="Some text-based content\n spanning multiple lines", status=status, content_type='text/plain') response = requests.get(url) else: rsps.add(responses.GET, url, body=b"Some other binary stuff...", status=status, content_type='application/octet-stream') response = requests.get(url) return response
def test_downloader_http_errors(self): with responses.RequestsMock(assert_all_requests_are_fired=True) as rsps: rsps.add(responses.GET, self.TEST_MPD_URL, status=500) rsps.add(responses.GET, self.TEST_MPD_URL, status=404) dl = live.Downloader( mpd=self.TEST_MPD_URL, output_dir='output_httperrors', duplicate_etag_retry=2, singlethreaded=True) dl.run() dl.stream_id = '17875351285037717' output_file = 'output_httperrors.mp4' dl.stitch(output_file, cleartempfiles=True) self.assertFalse(os.path.isfile(output_file), '{0!s} is generated'.format(output_file))
def test_downloader_resp_headers(self): with open('mpdstub/mpd/17875351285037717.mpd', 'r') as f: mpd_content = f.read() with responses.RequestsMock(assert_all_requests_are_fired=True) as rsps: rsps.add(responses.GET, self.TEST_MPD_URL, body=mpd_content) rsps.add(responses.GET, self.TEST_MPD_URL, body=mpd_content, headers={'Cache-Control': 'max-age=1'}) rsps.add(responses.GET, self.TEST_MPD_URL, body=mpd_content, headers={'X-FB-Video-Broadcast-Ended': '1'}) dl = live.Downloader( mpd=self.TEST_MPD_URL, output_dir='output_respheaders') dl.run() with responses.RequestsMock(assert_all_requests_are_fired=True) as rsps: rsps.add(responses.GET, self.TEST_MPD_URL, body=mpd_content, headers={'Cache-Control': 'max-age=1'}) rsps.add(responses.GET, self.TEST_MPD_URL, body=mpd_content, headers={'Cache-Control': 'max-age=1000'}) dl = live.Downloader( mpd=self.TEST_MPD_URL, output_dir='output_respheaders') dl.run() # Can't stitch and check for output because responses does not support # url pass through, so the segments cannot be downloaded.
def api_responses(): with responses.RequestsMock() as r: yield r
def test_generate_metadata(self): with RequestsMock() as rsp: rsp.add(rsp.GET, re.compile('.*SHOW\+DATABASES.*'), json=json_database_list, match_querystring=True) rsp.add(rsp.GET, re.compile('.*SHOW\+MEASUREMENTS.*'), json=json_measurement_list, match_querystring=True) rsp.add(rsp.GET, re.compile('.*SHOW\+MEASUREMENTS.*'), json=json_measurement_list, match_querystring=True) rsp.add(rsp.GET, re.compile('.*SHOW\+FIELD\+KEYS.*'), json=json_field_keys, match_querystring=True) rsp.add(rsp.GET, re.compile('.*SHOW\+TAG\+KEYS.*'), json=json_tag_keys, match_querystring=True) rsp.add(rsp.GET, re.compile('.*SHOW\+FIELD\+KEYS.*'), json=json_field_keys, match_querystring=True) rsp.add(rsp.GET, re.compile('.*SHOW\+TAG\+KEYS.*'), json=json_tag_keys, match_querystring=True) rsp.add(rsp.GET, re.compile('.*SHOW\+FIELD\+KEYS.*'), json=json_field_keys, match_querystring=True) rsp.add(rsp.GET, re.compile('.*SHOW\+TAG\+KEYS.*'), json=json_tag_keys, match_querystring=True) rsp.add(rsp.GET, re.compile('.*SHOW\+FIELD\+KEYS.*'), json=json_field_keys, match_querystring=True) rsp.add(rsp.GET, re.compile('.*SHOW\+TAG\+KEYS.*'), json=json_tag_keys, match_querystring=True) metadata = generate_metadata('influxdb://localhost:8086') file1 = open(os.path.join('test_data', 'test_metadata.xml'), 'r').read() open(os.path.join('test_data', 'tmp_metadata.xml'), 'wb').write(metadata) self.assert_(metadata == file1)
def test_len_collection(self): first_feed = next(self._container.itervalues()) collection = first_feed.OpenCollection() with RequestsMock() as rsp: rsp.add(rsp.GET, re.compile('.*SELECT\+COUNT.*'), json=json_count(collection.name), match_querystring=True) len_collection = len(collection) self.assertEqual(len_collection, NUM_TEST_POINTS)