我们从Python开源项目中,提取了以下8个代码示例,用于说明如何使用responses.HEAD。
def test_submit_media_exceptions(self, response): """ Tests media submission exceptions """ responses.add( responses.HEAD, u'https://s3.amazonaws.com/bkt/video.mp4', headers={'Content-Type': u'video/mp4'}, status=200, ) responses.add(responses.GET, u'https://api.3playmedia.com/caption_imports/available_languages', **{ 'status': 200, 'body': json.dumps([{ "iso_639_1_code": "en", "language_id": 1, }]) }) responses.add(responses.POST, u'https://api.3playmedia.com/files', **response) three_play_client = ThreePlayMediaClient(**self.video_transcript_preferences) with self.assertRaises(ThreePlayMediaPerformTranscriptionError): three_play_client.submit_media()
def test_generate_transcripts_exceptions(self, first_response, second_response, third_response, mock_log): """ Tests the proper exceptions during transcript generation. """ responses.add(responses.HEAD, u'https://s3.amazonaws.com/bkt/video.mp4', **first_response) responses.add( responses.GET, u'https://api.3playmedia.com/caption_imports/available_languages', **second_response ) responses.add(responses.POST, u'https://api.3playmedia.com/files', **third_response) three_play_client = ThreePlayMediaClient(**self.video_transcript_preferences) three_play_client.generate_transcripts() self.assertFalse(mock_log.info.called) mock_log.exception.assert_called_with( u'[3PlayMedia] Could not process transcripts for video=%s source_language=%s.', VIDEO_DATA['studio_id'], VIDEO_DATA['source_language'], ) self.assertEqual(TranscriptProcessMetadata.objects.count(), 0)
def set_up_glove(url: str, byt: bytes, change_etag_every: int = 1000): # Mock response for the datastore url that returns glove vectors responses.add( responses.GET, url, body=byt, status=200, content_type='application/gzip', stream=True, headers={'Content-Length': str(len(byt))} ) etags_left = change_etag_every etag = "0" def head_callback(_): """ Writing this as a callback allows different responses to different HEAD requests. In our case, we're going to change the ETag header every `change_etag_every` requests, which will allow us to simulate having a new version of the file. """ nonlocal etags_left, etag headers = {"ETag": etag} # countdown and change ETag etags_left -= 1 if etags_left <= 0: etags_left = change_etag_every etag = str(int(etag) + 1) return (200, headers, "") responses.add_callback( responses.HEAD, url, callback=head_callback )
def test_validate_media_url(self, response): """ Tests media url validations. """ responses.add(responses.HEAD, u'https://s3.amazonaws.com/bkt/video.mp4', **response) three_play_client = ThreePlayMediaClient(**self.video_transcript_preferences) with self.assertRaises(ThreePlayMediaUrlError): three_play_client.validate_media_url()
def test_remove_from_search_after_sync(self): """When an image is removed from the source, it should be removed from the search engine""" self._index_img(self.removed) s = self.s.query(Q("match", title="removed")) r = s.execute() self.assertEquals(1, r.hits.total) with responses.RequestsMock() as rsps: rsps.add(responses.HEAD, FOREIGN_URL + TEST_IMAGE_REMOVED, status=404) self.removed.sync() signals._update_search_index(self.removed) self.es.indices.refresh() s = self.s.query(Q("match", title="removed")) r = s.execute() self.assertEquals(0, r.hits.total)
def test_push(self): mydir = os.path.dirname(__file__) build_path = os.path.join(mydir, './build_simple.yml') command.build('foo/bar', build_path) pkg_obj = store.PackageStore.find_package('foo', 'bar') pkg_hash = pkg_obj.get_hash() assert pkg_hash contents = pkg_obj.get_contents() all_hashes = set(find_object_hashes(contents)) upload_urls = { blob_hash: dict( head="https://example.com/head/{owner}/{hash}".format(owner='foo', hash=blob_hash), put="https://example.com/put/{owner}/{hash}".format(owner='foo', hash=blob_hash) ) for blob_hash in all_hashes } # We will push the package twice, so we're mocking all responses twice. for blob_hash in all_hashes: urls = upload_urls[blob_hash] # First time the package is pushed, s3 HEAD 404s, and we get a PUT. self.requests_mock.add(responses.HEAD, urls['head'], status=404) self.requests_mock.add(responses.PUT, urls['put']) # Second time, s3 HEAD succeeds, and we're not expecting a PUT. self.requests_mock.add(responses.HEAD, urls['head']) self._mock_put_package('foo/bar', pkg_hash, upload_urls) self._mock_put_tag('foo/bar', 'latest') self._mock_put_package('foo/bar', pkg_hash, upload_urls) self._mock_put_tag('foo/bar', 'latest') # Push a new package. command.push('foo/bar') # Push it again; this time, we're verifying that there are no s3 uploads. command.push('foo/bar')
def setup_recording(self, **kwargs): _logger.info("recording ...") self.responses.reset() all_requests_re = re.compile("http.*") methods = (responses.GET, responses.POST, responses.PUT, responses.PATCH, responses.DELETE, responses.HEAD, responses.OPTIONS) for http_method in methods: self.responses.add_callback( http_method, all_requests_re, match_querystring=False, callback=self.record())
def test_get_from_cache(self): url = 'http://fake.datastore.com/glove.txt.gz' set_up_glove(url, self.glove_bytes, change_etag_every=2) filename = get_from_cache(url, cache_dir=self.TEST_DIR) assert filename == os.path.join(self.TEST_DIR, url_to_filename(url, etag="0")) # We should have made one HEAD request and one GET request. method_counts = Counter(call.request.method for call in responses.calls) assert len(method_counts) == 2 assert method_counts['HEAD'] == 1 assert method_counts['GET'] == 1 # And the cached file should have the correct contents with open(filename, 'rb') as cached_file: assert cached_file.read() == self.glove_bytes # A second call to `get_from_cache` should make another HEAD call # but not another GET call. filename2 = get_from_cache(url, cache_dir=self.TEST_DIR) assert filename2 == filename method_counts = Counter(call.request.method for call in responses.calls) assert len(method_counts) == 2 assert method_counts['HEAD'] == 2 assert method_counts['GET'] == 1 with open(filename2, 'rb') as cached_file: assert cached_file.read() == self.glove_bytes # A third call should have a different ETag and should force a new download, # which means another HEAD call and another GET call. filename3 = get_from_cache(url, cache_dir=self.TEST_DIR) assert filename3 == os.path.join(self.TEST_DIR, url_to_filename(url, etag="1")) method_counts = Counter(call.request.method for call in responses.calls) assert len(method_counts) == 2 assert method_counts['HEAD'] == 3 assert method_counts['GET'] == 2 with open(filename3, 'rb') as cached_file: assert cached_file.read() == self.glove_bytes