我们从Python开源项目中,提取了以下21个代码示例,用于说明如何使用httpretty.PUT。
def test_get_or_create_db(): server = BootstrapServer("http://test.test:5984") httpretty.register_uri( httpretty.HEAD, "http://test.test:5984/test", status=404, body="" ) def create_test_db(request, uri, headers): httpretty.reset() httpretty.register_uri( httpretty.HEAD, "http://test.test:5984/test", status=200 ) httpretty.register_uri( httpretty.PUT, "http://test.test:5984/test", status=500 ) return 201, headers, "" httpretty.register_uri( httpretty.PUT, "http://test.test:5984/test", body=create_test_db ) assert "test" not in server test_db = server.get_or_create("test") assert "test" in server
def test_get_or_create_db(): server = Server("http://test.test:5984") httpretty.register_uri( httpretty.HEAD, "http://test.test:5984/test", status=404, body="" ) def create_test_db(request, uri, headers): httpretty.reset() httpretty.register_uri( httpretty.HEAD, "http://test.test:5984/test", status=200 ) httpretty.register_uri( httpretty.PUT, "http://test.test:5984/test", status=500 ) return 201, headers, "" httpretty.register_uri( httpretty.PUT, "http://test.test:5984/test", body=create_test_db ) assert "test" not in server test_db = server.get_or_create("test") assert "test" in server
def test_upload(self): httpretty.register_uri( httpretty.PUT, OnedriveAPIClient.DOWNLOAD_URL[1].format(path=self._get_path()), body='') self.client.upload(self.chunk, TEST_CHUNK_BODY)
def test_create_user(): server = BootstrapServer("http://test.test:5984") httpretty.register_uri( httpretty.HEAD, "http://test.test:5984/_users" ) httpretty.register_uri( httpretty.PUT, "http://test.test:5984/_users/org.couchdb.user%3Atest", status=201 ) server.create_user("test", "test")
def test_update(self): """Function defined to test paystackapi plan update.""" httpretty.register_uri( httpretty.PUT, "https://api.paystack.co/plan/78", content_type='text/json', body='{"status": true, "contributors": true}', status=201, ) response = Plan.update(plan_id=78, interval="monthly", amount=70000) self.assertEqual(response['status'], True)
def test_update(self): """Function defined to test paystackapi customer update.""" httpretty.register_uri( httpretty.PUT, "https://api.paystack.co/customer/4013", content_type='text/json', body='{"status": true, "contributors": true}', status=201, ) response = Customer.update(customer_id=4013, first_name='andela') self.assertEqual(response['status'], True)
def test_put_method_called(self): httpretty.register_uri( httpretty.PUT, "https://api.paystack.co/customer/4013", content_type='text/json', body='{"status": true, "contributors": null}', status=302, ) req = PayStackRequests() response = req.put('customer/4013', data={'test': 'requests'}) self.assertTrue(response['status'])
def test_init_without_cloud_server( config, push_design_documents, get_or_create, generate_config ): runner = CliRunner() generate_config.return_value = {"test": {"test": "test"}} httpretty.register_uri( httpretty.GET, "http://localhost:5984/_config/test/test", body='"test_val"' ) httpretty.register_uri( httpretty.PUT, "http://localhost:5984/_config/test/test" ) # Show -- Should throw an error because no local server is selected res = runner.invoke(show) assert res.exit_code, res.output # Init -- Should work and push the design documents but not replicate # anything res = runner.invoke(init) assert res.exit_code == 0, res.exception or res.output assert get_or_create.call_count == len(all_dbs) assert push_design_documents.call_count == 1 push_design_documents.reset_mock() # Show -- Should work res = runner.invoke(show) assert res.exit_code == 0, res.exception or res.output # Init -- Should throw an error because a different database is already # selected res = runner.invoke(init, ["--db_url", "http://test.test:5984"]) assert res.exit_code, res.output
def test_user_with_cloud_server(config): runner = CliRunner() # Register -- Should work httpretty.register_uri( httpretty.HEAD, "http://test.test:5984/_users" ) httpretty.register_uri( httpretty.PUT, "http://test.test:5984/_users/org.couchdb.user%3Atest", status=201 ) res = runner.invoke(register, input="test\ntest\ntest\n") assert res.exit_code == 0, res.exception or res.output # Login -- Should work httpretty.register_uri( httpretty.GET, "http://test.test:5984/_session" ) res = runner.invoke(login, input="test\ntest\n") assert res.exit_code == 0, res.exception or res.output # Login -- Should throw an error because you're already logged in as a # different user res = runner.invoke(login, input="test2\ntest2\n") assert res.exit_code, res.output assert isinstance(res.exception, SystemExit) # Logout -- Should work res = runner.invoke(logout) assert res.exit_code == 0, res.exception or res.output
def test_create_user(): server = Server("http://test.test:5984") httpretty.register_uri( httpretty.HEAD, "http://test.test:5984/_users" ) httpretty.register_uri( httpretty.PUT, "http://test.test:5984/_users/org.couchdb.user%3Atest", status=201 ) server.create_user("test", "test")
def given_an_existing_user(self): self.start_mocking_http() self.fake_response('/users/foo', file='user-foo.json') self.expect_call('/users/foo', httpretty.PUT)
def given_an_existing_user(self): self.start_mocking_http() self.fake_response('/users/admin', file='user-admin.json') self.expect_call('/users/admin', httpretty.PUT)
def given_an_existing_user(self): self.start_mocking_http() self.fake_response('/users/giddy', file='user-giddy.json') self.expect_call('/users/giddy', httpretty.PUT)
def swagger_stub(swagger_files_url): """Fixture to stub a microservice from swagger files. To use this fixture you need to define a swagger fixture named swagger_files_url with the path to your swagger files, and the url to stub. Then just add this fixture to your tests and your request pointing to the urls in swagger_files_url will be managed by the stub. Example: @pytest.fixture def swagger_files_url(): return [('tests/swagger.yaml', 'http://localhost:8000')] """ httpretty.enable() for i in swagger_files_url: # Get all given swagger files and url base_url = i[1] s = SwaggerParser(i[0]) swagger_url[base_url] = s # Register all urls httpretty.register_uri( httpretty.GET, re.compile(base_url + r'/.*'), body=get_data_from_request) httpretty.register_uri( httpretty.POST, re.compile(base_url + r'/.*'), body=get_data_from_request) httpretty.register_uri( httpretty.PUT, re.compile(base_url + r'/.*'), body=get_data_from_request) httpretty.register_uri( httpretty.PATCH, re.compile(base_url + r'/.*'), body=get_data_from_request) httpretty.register_uri( httpretty.DELETE, re.compile(base_url + r'/.*'), body=get_data_from_request) memory[base_url] = StubMemory(s) yield memory[base_url] # Close httpretty httpretty.disable() httpretty.reset()
def test_replicate(): # Start a replication server = BootstrapServer("http://test.test:5984") httpretty.register_uri( httpretty.HEAD, "http://test.test:5984/_replicator" ) httpretty.register_uri( httpretty.HEAD, "http://test.test:5984/_replicator/test", status=404 ) def replicate_test_src(request, uri, headers): httpretty.reset() httpretty.register_uri( httpretty.HEAD, "http://test.test:5984/_replicator" ) httpretty.register_uri( httpretty.HEAD, "http://test.test:5984/_replicator/test", status=200, etag="a" ) return 201, headers, json.dumps({ "id": "test_src", "rev": "a", "ok": True }) httpretty.register_uri( httpretty.PUT, "http://test.test:5984/_replicator/test", status=201, content_type="application/json", body=replicate_test_src ) server.replicate("test", "test_src", "test_dest", continuous=True) assert "test" in server["_replicator"] # Make sure replicate is idempotent httpretty.register_uri( httpretty.PUT, "http://test.test:5984/_replicator/test_src", status=500 ) server.replicate("test", "test_src", "test_dest", continuous=True) assert "test" in server["_replicator"] # Cancel the replication def cancel_test_replication(request, uri, headers): httpretty.reset() httpretty.register_uri( httpretty.HEAD, "http://test.test:5984/_replicator" ) httpretty.register_uri( httpretty.HEAD, "http://test.test:5984/_replicator/test", status=404 ) return 200, headers, "" httpretty.register_uri( httpretty.DELETE, "http://test.test:5984/_replicator/test", status=200, body=cancel_test_replication ) server.cancel_replication("test") assert "test" not in server["_replicator"]
def test_push_design_documents(): server = BootstrapServer("http://test.test:5984") tempdir = tempfile.mkdtemp() try: test_db_path = os.path.join(tempdir, "test") os.mkdir(test_db_path) hidden_db_path = os.path.join(tempdir, ".test") os.mkdir(hidden_db_path) views_path = os.path.join(test_db_path, "views") os.mkdir(views_path) test_view_path = os.path.join(views_path, "test") os.mkdir(test_view_path) map_path = os.path.join(test_view_path, "map.js") with open(map_path, "w+") as f: f.write("test") hidden_map_path = os.path.join(test_view_path, ".test.js") with open(hidden_map_path, "w+") as f: f.write("test") httpretty.register_uri( httpretty.HEAD, "http://test.test:5984/test" ) def create_design_doc(request, uri, headers): obj = { "_id": "_design/openag", "views": { "test": { "map": "test" } } } if json.loads(request.body) == obj: return 200, headers, json.dumps({ "id": "_design/openag", "rev": "a" }) else: return 500, headers, "" httpretty.register_uri( httpretty.HEAD, "http://test.test:5984/test/_design/openag", status=404 ) httpretty.register_uri( httpretty.PUT, "http://test.test:5984/test/_design/openag", body=create_design_doc, content_type="application/json" ) server.push_design_documents(tempdir) finally: shutil.rmtree(tempdir)
def test_replicate(): # Start a replication server = Server("http://test.test:5984") httpretty.register_uri( httpretty.HEAD, "http://test.test:5984/_replicator" ) httpretty.register_uri( httpretty.HEAD, "http://test.test:5984/_replicator/test", status=404 ) def replicate_test_src(request, uri, headers): httpretty.reset() httpretty.register_uri( httpretty.HEAD, "http://test.test:5984/_replicator" ) httpretty.register_uri( httpretty.HEAD, "http://test.test:5984/_replicator/test", status=200, etag="a" ) return 201, headers, json.dumps({ "id": "test_src", "rev": "a", "ok": True }) httpretty.register_uri( httpretty.PUT, "http://test.test:5984/_replicator/test", status=201, content_type="application/json", body=replicate_test_src ) server.replicate("test", "test_src", "test_dest", continuous=True) assert "test" in server["_replicator"] # Make sure replicate is idempotent httpretty.register_uri( httpretty.PUT, "http://test.test:5984/_replicator/test_src", status=500 ) server.replicate("test", "test_src", "test_dest", continuous=True) assert "test" in server["_replicator"] # Cancel the replication def cancel_test_replication(request, uri, headers): httpretty.reset() httpretty.register_uri( httpretty.HEAD, "http://test.test:5984/_replicator" ) httpretty.register_uri( httpretty.HEAD, "http://test.test:5984/_replicator/test", status=404 ) return 200, headers, "" httpretty.register_uri( httpretty.DELETE, "http://test.test:5984/_replicator/test", status=200, body=cancel_test_replication ) server.cancel_replication("test") assert "test" not in server["_replicator"]