我们从Python开源项目中,提取了以下35个代码示例,用于说明如何使用httpretty.reset()。
def testCanDeleteOwnedDomain(self): httpretty.enable() httpretty.register_uri(httpretty.DELETE, settings.NSLORD_PDNS_API + '/zones/' + self.ownedDomains[1].name + '.') httpretty.register_uri(httpretty.DELETE, settings.NSMASTER_PDNS_API + '/zones/' + self.ownedDomains[1].name+ '.') url = reverse('domain-detail', args=(self.ownedDomains[1].pk,)) response = self.client.delete(url) self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) self.assertEqual(httpretty.last_request().method, 'DELETE') self.assertEqual(httpretty.last_request().headers['Host'], 'nsmaster:8081') httpretty.reset() httpretty.register_uri(httpretty.DELETE, settings.NSLORD_PDNS_API + '/zones/' + self.ownedDomains[1].name + '.') httpretty.register_uri(httpretty.DELETE, settings.NSMASTER_PDNS_API + '/zones/' + self.ownedDomains[1].name+ '.') response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) self.assertTrue(isinstance(httpretty.last_request(), httpretty.core.HTTPrettyRequestEmpty))
def testCanDeleteOwnedDynDomain(self): httpretty.enable() httpretty.register_uri(httpretty.DELETE, settings.NSLORD_PDNS_API + '/zones/' + self.ownedDomains[1].name + '.') httpretty.register_uri(httpretty.DELETE, settings.NSMASTER_PDNS_API + '/zones/' + self.ownedDomains[1].name + '.') url = reverse('domain-detail', args=(self.ownedDomains[1].pk,)) response = self.client.delete(url) self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) # FIXME In this testing scenario, the parent domain dedyn.io does not # have the proper NS and DS records set up, so we cannot test their # deletion. httpretty.reset() httpretty.register_uri(httpretty.DELETE, settings.NSLORD_PDNS_API + '/zones/' + self.ownedDomains[1].name + '.') httpretty.register_uri(httpretty.DELETE, settings.NSMASTER_PDNS_API + '/zones/' + self.ownedDomains[1].name+ '.') response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) self.assertTrue(isinstance(httpretty.last_request(), httpretty.core.HTTPrettyRequestEmpty))
def test_get_or_create_db(): server = BootstrapServer("http://test.test:5984") httpretty.register_uri( httpretty.HEAD, "http://test.test:5984/test", status=404, body="" ) def create_test_db(request, uri, headers): httpretty.reset() httpretty.register_uri( httpretty.HEAD, "http://test.test:5984/test", status=200 ) httpretty.register_uri( httpretty.PUT, "http://test.test:5984/test", status=500 ) return 201, headers, "" httpretty.register_uri( httpretty.PUT, "http://test.test:5984/test", body=create_test_db ) assert "test" not in server test_db = server.get_or_create("test") assert "test" in server
def test_get_or_create_db(): server = Server("http://test.test:5984") httpretty.register_uri( httpretty.HEAD, "http://test.test:5984/test", status=404, body="" ) def create_test_db(request, uri, headers): httpretty.reset() httpretty.register_uri( httpretty.HEAD, "http://test.test:5984/test", status=200 ) httpretty.register_uri( httpretty.PUT, "http://test.test:5984/test", status=500 ) return 201, headers, "" httpretty.register_uri( httpretty.PUT, "http://test.test:5984/test", body=create_test_db ) assert "test" not in server test_db = server.get_or_create("test") assert "test" in server
def teardown_httpretty(): httpretty.disable() httpretty.reset()
def test_request_timeout(): timeout = 0.1 http_scheme = 'http' host = 'coordinator' port = 8080 url = http_scheme + '://' + host + ':' + str(port) + constants.URL_STATEMENT_PATH def long_call(request, uri, headers): time.sleep(timeout * 2) return (200, headers, "delayed success") httpretty.enable() for method in [httpretty.POST, httpretty.GET]: httpretty.register_uri(method, url, body=long_call) # timeout without retry for request_timeout in [timeout, (timeout, timeout)]: req = PrestoRequest( host=host, port=port, user='test', http_scheme=http_scheme, max_attempts=1, request_timeout=request_timeout, ) with pytest.raises(requests.exceptions.Timeout): req.get(url) with pytest.raises(requests.exceptions.Timeout): req.post('select 1') httpretty.disable() httpretty.reset()
def tearDown(self): os.remove("line_file.json") os.remove("array_file.json") os.remove("object_file.json") os.remove("block_file.json") httpretty.disable() httpretty.reset()
def tearDown(self): httpretty.disable() httpretty.reset()
def tearDown(self): os.remove("test.csv") httpretty.disable() httpretty.reset()
def cleanUp(self): httpretty.disable() httpretty.reset()
def tearDown(self): httpretty.reset() httpretty.disable()
def setUp(self): httpretty.reset() httpretty.disable() if not hasattr(self, 'owner'): self.owner = utils.createUser() self.ownedDomains = [utils.createDomain(self.owner), utils.createDomain(self.owner)] self.token = utils.createToken(user=self.owner) self.client.credentials(HTTP_AUTHORIZATION='Token ' + self.token) self.otherOwner = utils.createUser() self.otherDomains = [utils.createDomain(self.otherOwner), utils.createDomain()] self.otherToken = utils.createToken(user=self.otherOwner)
def test_project_filter(connection, project_payload, project): """Verifies project listing and filtering by type.""" httpretty.register_uri( httpretty.GET, '{}/projects'.format( matchlight.MATCHLIGHT_API_URL_V2), body=json.dumps({'data': [project_payload]}), content_type='application/json', status=200) projects = connection.projects.filter() assert len(projects) == 1 assert projects[0].upload_token == project.upload_token httpretty.reset() project_list = [project_payload] for _ in six.moves.range(5): payload = project_payload.copy() for project_type in PROJECT_TYPES: if project_type == payload['project_type']: continue payload['project_type'] = project_type break project_list.append(payload) httpretty.register_uri( httpretty.GET, '{}/projects'.format( matchlight.MATCHLIGHT_API_URL_V2), body=json.dumps({'data': project_list}), content_type='application/json', status=200) projects = connection.projects.filter(project_type=project.project_type) assert len(projects) == 1 assert projects[0].project_type == project.project_type
def test_alive(self): httpretty.enable() httpretty.register_uri(httpretty.GET, SERVICE_URL, body="LIVE") monitor = SocketPortMonitor(SERVICE_HOST, port=SERVICE_PORT); self.assertTrue(monitor.alive()) httpretty.disable() httpretty.reset()
def test_check_alive(self): httpretty.enable() httpretty.register_uri(httpretty.GET, SERVICE_URL, body="LIVE") self.assertTrue(check_alive('SocketPortMonitor', SERVICE_HOST, port='8888')) httpretty.disable() httpretty.reset()
def httpretty(): httpretty_module.reset() httpretty_module.enable() yield httpretty_module httpretty_module.disable()
def step_impl(context): d = context.bad_login assert d['source']['username'] != 'ValidUser' httpretty.enable() httpretty.register_uri( httpretty.POST, context.good_build_url, body='{"errors":[{"context":null,"message":"Authentication failed. Please check your credentials and try again.","exceptionName":"com.atlassian.bitbucket.auth.IncorrectPasswordAuthenticationException"}]}', status=401 ) from scripts.bitbucket import post_result, err result = post_result(context.good_build_url, d['source']['username'], d['source']['password'], False, good_status_dict(), True) assert result.status_code == 401 httpretty.disable() httpretty.reset()
def step_impl(context): d = context.good_login assert context.good_login['source']['username'] == 'ValidUser' httpretty.enable() httpretty.register_uri( httpretty.POST, context.bad_build_url, body='{"errors":[{"context":null,"message":"You are not permitted to access this resource","exceptionName":"com.atlassian.bitbucket.AuthorisationException"}]}', status=403 ) from scripts.bitbucket import post_result, err result = post_result(context.bad_build_url, d['source']['username'], d['source']['password'], False, good_status_dict(), True) assert result.status_code == 403 httpretty.disable() httpretty.reset()
def tearDown(self): httpretty.disable() # disable afterwards, so that you will have no problems in code that uses that socket module httpretty.reset()
def read_temperatures(self): with patch('ConfigParser.open', create=True) as open_: with open('mock_data/' + self.ini_file_name) as file_: def reset(): file_.seek(0) open_.return_value.readline = file_.readline open_.return_value.close = reset import evohome return evohome.read()
def tearDown(self): """Disable afterwards to let other code use that socket module""" httpretty.disable() httpretty.reset() # ----- # About # -----
def cleanup_httpretty(self): httpretty.reset() httpretty.disable()
def test_has_request(): ("httpretty.has_request() correctly detects " "whether or not a request has been made") httpretty.reset() httpretty.has_request().should.be.false with patch('httpretty.httpretty.last_request', return_value=HTTPrettyRequest('')): httpretty.has_request().should.be.true
def swagger_stub(swagger_files_url): """Fixture to stub a microservice from swagger files. To use this fixture you need to define a swagger fixture named swagger_files_url with the path to your swagger files, and the url to stub. Then just add this fixture to your tests and your request pointing to the urls in swagger_files_url will be managed by the stub. Example: @pytest.fixture def swagger_files_url(): return [('tests/swagger.yaml', 'http://localhost:8000')] """ httpretty.enable() for i in swagger_files_url: # Get all given swagger files and url base_url = i[1] s = SwaggerParser(i[0]) swagger_url[base_url] = s # Register all urls httpretty.register_uri( httpretty.GET, re.compile(base_url + r'/.*'), body=get_data_from_request) httpretty.register_uri( httpretty.POST, re.compile(base_url + r'/.*'), body=get_data_from_request) httpretty.register_uri( httpretty.PUT, re.compile(base_url + r'/.*'), body=get_data_from_request) httpretty.register_uri( httpretty.PATCH, re.compile(base_url + r'/.*'), body=get_data_from_request) httpretty.register_uri( httpretty.DELETE, re.compile(base_url + r'/.*'), body=get_data_from_request) memory[base_url] = StubMemory(s) yield memory[base_url] # Close httpretty httpretty.disable() httpretty.reset()
def test_record_add_pii(connection, project, pii_records_raw): """Verifies adding PII records to a project.""" record_data = [ { 'id': uuid.uuid4().hex, 'name': matchlight.utils.blind_email(record['email']), 'description': '', 'ctime': time.time(), 'mtime': time.time(), } for record in pii_records_raw ] httpretty.register_uri( httpretty.POST, '{}/records/upload/pii/{}'.format( matchlight.MATCHLIGHT_API_URL_V2, project.upload_token), responses=[ httpretty.Response( body=json.dumps({ 'id': payload['id'], 'name': payload['name'], 'description': payload['description'], 'ctime': payload['ctime'], 'mtime': payload['mtime'], 'metadata': '{}', }), content_type='application/json', status=200) for payload in record_data ]) for i, pii_record in enumerate(pii_records_raw): record = connection.records.add_pii( project=project, description='', **pii_record) assert record.id == record_data[i]['id'] httpretty.reset() for i, pii_record in enumerate(pii_records_raw): record = connection.records.add_pii( project=project, description='', offline=True, **pii_record) assert isinstance(record, dict) assert not httpretty.has_request()
def test_replicate(): # Start a replication server = BootstrapServer("http://test.test:5984") httpretty.register_uri( httpretty.HEAD, "http://test.test:5984/_replicator" ) httpretty.register_uri( httpretty.HEAD, "http://test.test:5984/_replicator/test", status=404 ) def replicate_test_src(request, uri, headers): httpretty.reset() httpretty.register_uri( httpretty.HEAD, "http://test.test:5984/_replicator" ) httpretty.register_uri( httpretty.HEAD, "http://test.test:5984/_replicator/test", status=200, etag="a" ) return 201, headers, json.dumps({ "id": "test_src", "rev": "a", "ok": True }) httpretty.register_uri( httpretty.PUT, "http://test.test:5984/_replicator/test", status=201, content_type="application/json", body=replicate_test_src ) server.replicate("test", "test_src", "test_dest", continuous=True) assert "test" in server["_replicator"] # Make sure replicate is idempotent httpretty.register_uri( httpretty.PUT, "http://test.test:5984/_replicator/test_src", status=500 ) server.replicate("test", "test_src", "test_dest", continuous=True) assert "test" in server["_replicator"] # Cancel the replication def cancel_test_replication(request, uri, headers): httpretty.reset() httpretty.register_uri( httpretty.HEAD, "http://test.test:5984/_replicator" ) httpretty.register_uri( httpretty.HEAD, "http://test.test:5984/_replicator/test", status=404 ) return 200, headers, "" httpretty.register_uri( httpretty.DELETE, "http://test.test:5984/_replicator/test", status=200, body=cancel_test_replication ) server.cancel_replication("test") assert "test" not in server["_replicator"]
def test_replicate(): # Start a replication server = Server("http://test.test:5984") httpretty.register_uri( httpretty.HEAD, "http://test.test:5984/_replicator" ) httpretty.register_uri( httpretty.HEAD, "http://test.test:5984/_replicator/test", status=404 ) def replicate_test_src(request, uri, headers): httpretty.reset() httpretty.register_uri( httpretty.HEAD, "http://test.test:5984/_replicator" ) httpretty.register_uri( httpretty.HEAD, "http://test.test:5984/_replicator/test", status=200, etag="a" ) return 201, headers, json.dumps({ "id": "test_src", "rev": "a", "ok": True }) httpretty.register_uri( httpretty.PUT, "http://test.test:5984/_replicator/test", status=201, content_type="application/json", body=replicate_test_src ) server.replicate("test", "test_src", "test_dest", continuous=True) assert "test" in server["_replicator"] # Make sure replicate is idempotent httpretty.register_uri( httpretty.PUT, "http://test.test:5984/_replicator/test_src", status=500 ) server.replicate("test", "test_src", "test_dest", continuous=True) assert "test" in server["_replicator"] # Cancel the replication def cancel_test_replication(request, uri, headers): httpretty.reset() httpretty.register_uri( httpretty.HEAD, "http://test.test:5984/_replicator" ) httpretty.register_uri( httpretty.HEAD, "http://test.test:5984/_replicator/test", status=404 ) return 200, headers, "" httpretty.register_uri( httpretty.DELETE, "http://test.test:5984/_replicator/test", status=200, body=cancel_test_replication ) server.cancel_replication("test") assert "test" not in server["_replicator"]