我们从Python开源项目中,提取了以下28个代码示例,用于说明如何使用falcon.HTTP_201。
def on_post(self, req, resp, revision_id, tag=None): """Creates a revision tag.""" body = req.stream.read(req.content_length or 0) try: tag_data = yaml.safe_load(body) except yaml.YAMLError as e: error_msg = ("Could not parse the request body into YAML data. " "Details: %s." % e) LOG.error(error_msg) raise falcon.HTTPBadRequest(description=e) try: resp_tag = db_api.revision_tag_create(revision_id, tag, tag_data) except errors.RevisionNotFound as e: raise falcon.HTTPNotFound(description=e.format_message()) except errors.RevisionTagBadFormat as e: raise falcon.HTTPBadRequest(description=e.format_message()) resp_body = revision_tag_view.ViewBuilder().show(resp_tag) resp.status = falcon.HTTP_201 resp.body = resp_body
def on_post(self, req, resp, revision_id): try: latest_revision = db_api.revision_get_latest() except errors.RevisionNotFound as e: raise falcon.HTTPNotFound(description=e.format_message()) for document in latest_revision['documents']: if document['metadata'].get('storagePolicy') == 'encrypted': policy.conditional_authorize( 'deckhand:create_encrypted_documents', req.context) break try: rollback_revision = db_api.revision_rollback( revision_id, latest_revision) except errors.InvalidRollback as e: raise falcon.HTTPBadRequest(description=e.format_message()) revision_resp = self.view_builder.show(rollback_revision) resp.status = falcon.HTTP_201 resp.body = revision_resp
def test_http_status_raised_from_error_handler(self): mw = CaptureResponseMiddleware() app = falcon.API(middleware=mw) app.add_route('/', MiddlewareClassResource()) client = testing.TestClient(app) def _http_error_handler(error, req, resp, params): raise falcon.HTTPStatus(falcon.HTTP_201) # NOTE(kgriffs): This will take precedence over the default # handler for facon.HTTPError. app.add_error_handler(falcon.HTTPError, _http_error_handler) response = client.simulate_request(path='/', method='POST') assert response.status == falcon.HTTP_201 assert mw.resp.status == response.status
def task_validate_design(self, req, resp, json_data): """Create async task for validate design.""" action = json_data.get('action', None) if action != 'validate_design': self.error( req.context, "Task body ended up in wrong handler: action %s in task_validate_design" % action) self.return_error( resp, falcon.HTTP_500, message="Error", retry=False) try: task = self.create_task(json_data, req.context) resp.body = json.dumps(task.to_dict()) resp.append_header('Location', "/api/v1.0/tasks/%s" % str(task.task_id)) resp.status = falcon.HTTP_201 except errors.InvalidFormat as ex: self.error(req.context, ex.msg) self.return_error( resp, falcon.HTTP_400, message=ex.msg, retry=False)
def task_verify_site(self, req, resp, json_data): """Create async task for verify site.""" action = json_data.get('action', None) if action != 'verify_site': self.error( req.context, "Task body ended up in wrong handler: action %s in task_verify_site" % action) self.return_error( resp, falcon.HTTP_500, message="Error", retry=False) try: task = self.create_task(json_data, req.context) resp.body = json.dumps(task.to_dict()) resp.append_header('Location', "/api/v1.0/tasks/%s" % str(task.task_id)) resp.status = falcon.HTTP_201 except errors.InvalidFormat as ex: self.error(req.context, ex.msg) self.return_error( resp, falcon.HTTP_400, message=ex.msg, retry=False)
def task_prepare_site(self, req, resp, json_data): """Create async task for prepare site.""" action = json_data.get('action', None) if action != 'prepare_site': self.error( req.context, "Task body ended up in wrong handler: action %s in task_prepare_site" % action) self.return_error( resp, falcon.HTTP_500, message="Error", retry=False) try: task = self.create_task(json_data, req.context) resp.body = json.dumps(task.to_dict()) resp.append_header('Location', "/api/v1.0/tasks/%s" % str(task.task_id)) resp.status = falcon.HTTP_201 except errors.InvalidFormat as ex: self.error(req.context, ex.msg) self.return_error( resp, falcon.HTTP_400, message=ex.msg, retry=False)
def task_prepare_nodes(self, req, resp, json_data): """Create async task for prepare node.""" action = json_data.get('action', None) if action != 'prepare_nodes': self.error( req.context, "Task body ended up in wrong handler: action %s in task_prepare_nodes" % action) self.return_error( resp, falcon.HTTP_500, message="Error", retry=False) try: task = self.create_task(json_data, req.context) resp.body = json.dumps(task.to_dict()) resp.append_header('Location', "/api/v1.0/tasks/%s" % str(task.task_id)) resp.status = falcon.HTTP_201 except errors.InvalidFormat as ex: self.error(req.context, ex.msg) self.return_error( resp, falcon.HTTP_400, message=ex.msg, retry=False)
def task_deploy_nodes(self, req, resp, json_data): """Create async task for deploy node.""" action = json_data.get('action', None) if action != 'deploy_nodes': self.error( req.context, "Task body ended up in wrong handler: action %s in task_deploy_nodes" % action) self.return_error( resp, falcon.HTTP_500, message="Error", retry=False) try: task = self.create_task(json_data, req.context) resp.body = json.dumps(task.to_dict()) resp.append_header('Location', "/api/v1.0/tasks/%s" % str(task.task_id)) resp.status = falcon.HTTP_201 except errors.InvalidFormat as ex: self.error(req.context, ex.msg) self.return_error( resp, falcon.HTTP_400, message=ex.msg, retry=False)
def task_destroy_nodes(self, req, resp, json_data): """Create async task for destroy node.""" action = json_data.get('action', None) if action != 'destroy_nodes': self.error( req.context, "Task body ended up in wrong handler: action %s in task_destroy_nodes" % action) self.return_error( resp, falcon.HTTP_500, message="Error", retry=False) try: task = self.create_task(json_data, req.context) resp.body = json.dumps(task.to_dict()) resp.append_header('Location', "/api/v1.0/tasks/%s" % str(task.task_id)) resp.status = falcon.HTTP_201 except errors.InvalidFormat as ex: self.error(req.context, ex.msg) self.return_error( resp, falcon.HTTP_400, message=ex.msg, retry=False)
def on_post(self, req, resp, parsed): model = models.UserScores( username=parsed.get('username'), company=parsed.get('company'), score=parsed.get('score') ) try: model.save(self.db.session) except IntegrityError: raise falcon.HTTPBadRequest( 'Username exists', 'Could not create user due to username already existing' ) resp.status = falcon.HTTP_201 resp.body = self.format_body({ 'id': model.id })
def on_post(self, req, resp, revision_id, validation_name): validation_data = req.stream.read(req.content_length or 0) try: validation_data = yaml.safe_load(validation_data) except yaml.YAMLError as e: error_msg = ("Could not parse the validation into YAML data. " "Details: %s." % e) LOG.error(error_msg) raise falcon.HTTPBadRequest(description=six.text_type(e)) if not validation_data: error_msg = 'Validation payload must be provided.' LOG.error(error_msg) raise falcon.HTTPBadRequest(description=error_msg) if not all([validation_data.get(x) for x in ('status', 'validator')]): error_msg = 'Validation payload must contain keys: %s.' % ( ', '.join(['"status"', '"validator"'])) LOG.error(error_msg) raise falcon.HTTPBadRequest(description=error_msg) try: resp_body = db_api.validation_create( revision_id, validation_name, validation_data) except errors.RevisionNotFound as e: raise falcon.HTTPNotFound(description=e.format_message()) resp.status = falcon.HTTP_201 resp.append_header('Content-Type', 'application/x-yaml') resp.body = self.view_builder.show(resp_body)
def on_post(self, req, resp): name = self._image_store.save(req.stream, req.content_type) resp.status = falcon.HTTP_201 resp.location = '/images/' + name
def on_post(self, req, resp, user_id): try: doc = req.context['doc'] except KeyError: raise falcon.HTTPBadRequest( 'Missing thing', 'A thing must be submitted in the request body.') proper_thing = self.db.add_thing(doc) resp.status = falcon.HTTP_201 resp.location = '/%s/things/%s' % (user_id, proper_thing['id']) # Configure your WSGI server to load "things.app" (app is a WSGI callable)
def test_multiple_reponse_mw_throw_exception(self): """Test that error in inner middleware leaves""" global context context['req_succeeded'] = [] class RaiseStatusMiddleware(object): def process_response(self, req, resp, resource): raise falcon.HTTPStatus(falcon.HTTP_201) class RaiseErrorMiddleware(object): def process_response(self, req, resp, resource): raise falcon.HTTPError(falcon.HTTP_748) class ProcessResponseMiddleware(object): def process_response(self, req, resp, resource, req_succeeded): context['executed_methods'].append('process_response') context['req_succeeded'].append(req_succeeded) app = falcon.API(middleware=[ProcessResponseMiddleware(), RaiseErrorMiddleware(), ProcessResponseMiddleware(), RaiseStatusMiddleware(), ProcessResponseMiddleware()]) app.add_route(TEST_ROUTE, MiddlewareClassResource()) client = testing.TestClient(app) response = client.simulate_request(path=TEST_ROUTE) assert response.status == falcon.HTTP_748 expected_methods = ['process_response'] * 3 assert context['executed_methods'] == expected_methods assert context['req_succeeded'] == [True, False, False]
def test_put(self, client, resource_things): client.app.add_route('/things', resource_things) client.app.add_route('/things/{id}/stuff/{sid}', resource_things) response = client.simulate_request(path='/things/42/stuff/1337', method='PUT') assert response.status == falcon.HTTP_201 assert resource_things.called
def on_post(self, req, resp): """Method handler for POST requests. :param req: Falcon request object :param resp: Falcon response object """ try: json_data = self.req_json(req) design = None if json_data is not None: base_design = json_data.get('base_design_id', None) if base_design is not None: base_design = uuid.UUID(base_design) design = hd_objects.SiteDesign(base_design_id=base_design) else: design = hd_objects.SiteDesign() design.assign_id() design.create(req.context, self.state_manager) resp.body = json.dumps(design.obj_to_simple()) resp.status = falcon.HTTP_201 except errors.StateError: self.error(req.context, "Error updating persistence") self.return_error( resp, falcon.HTTP_500, message="Error updating persistence", retry=True) except errors.InvalidFormat as fex: self.error(req.context, str(fex)) self.return_error( resp, falcon.HTTP_400, message=str(fex), retry=False)
def on_post(self, req, resp, **kwargs): resp.status = falcon.HTTP_201 resp.body = 'Success'
def on_post(self, req, resp, dataset_info, **kwargs): """Create a new dataset on the service This method will create a new empty dataset, and returns a 201 CREATED with Location header filled with the URI of new dataset. :param HTTPUserDatasetDTO dataset_info: HTTP Client dataset information :query int dataset_type: The dataset type (optional) :returns: Location header with new path to dataset object """ dao = data_access.DatasetDAO() # Get dataset type dts_type = req.get_param_as_int("dataset_type") dataset_type = dao.get_dataset_types()[dts_type]["class"] id_dts, err = dao.insert_empty_dataset( dataset_type, name=dataset_info.name, description=dataset_info.description) if id_dts is None and err[0] == 409: raise falcon.HTTPConflict( title="The dataset name is already used", description=err[1]) elif id_dts is None and err[0] == 500: raise falcon.HTTPInternalServerError(description=err[1]) else: # Dataset created, evrything is done resp.status = falcon.HTTP_201 resp.body = json.dumps({"dataset": {"id": id_dts}}) resp.location = "/datasets/" + str(id_dts)
def test_cluster_create(self): """ Verify creating a cluster. """ with mock.patch('cherrypy.engine.publish') as _publish: manager = mock.MagicMock(StoreHandlerManager) _publish.return_value = [manager] test_cluster = make_new(CLUSTER_WITH_HOST) # Verify with creation manager.get.side_effect = ( Exception, test_cluster, test_cluster, test_cluster ) test_body = '{"network": "default"}' body = self.simulate_request( '/api/v0/cluster/development', method='PUT', body=test_body) self.assertEquals(falcon.HTTP_201, self.srmock.status) self.assertEquals('{}', body[0]) # Verify with existing cluster manager.get.return_value = CLUSTER body = self.simulate_request( '/api/v0/cluster/development', method='PUT', body=test_body) self.assertEquals(falcon.HTTP_201, self.srmock.status) self.assertEquals('{}', body[0])
def test_cluster_restart_create(self): """ Verify creating a cluster restart. """ # Process is patched because we don't want to exec the subprocess # during unittesting with mock.patch('cherrypy.engine.publish') as _publish, \ mock.patch('commissaire.handlers.clusters.Process'): manager = mock.MagicMock(StoreHandlerManager) _publish.return_value = [manager] test_cluster = make_new(CLUSTER_WITH_HOST) manager.get.side_effect = ( test_cluster, Exception, MagicMock(StoreHandlerManager), make_new(CLUSTER_RESTART)) body = self.simulate_request( '/api/v0/cluster/development/restart', method='PUT') self.assertEquals(falcon.HTTP_201, self.srmock.status) result = json.loads(body[0]) self.assertEquals('in_process', result['status']) self.assertEquals([], result['restarted']) self.assertEquals([], result['in_process'])
def test_cluster_upgrade_create(self): """ Verify creating a cluster upgrade. """ # Process is patched because we don't want to exec the subprocess # during unittesting with mock.patch('cherrypy.engine.publish') as _publish, \ mock.patch('commissaire.handlers.clusters.Process'): manager = mock.MagicMock(StoreHandlerManager) _publish.return_value = [manager] manager.get.side_effect = ( make_new(CLUSTER_WITH_FLAT_HOST), Exception, MagicMock(StoreHandlerManager), make_new(CLUSTER_UPGRADE)) # Verify with creation body = self.simulate_request( '/api/v0/cluster/development/upgrade', method='PUT') self.assertEquals(falcon.HTTP_201, self.srmock.status) result = json.loads(body[0]) self.assertEquals('in_process', result['status']) self.assertEquals([], result['upgraded']) self.assertEquals([], result['in_process'])
def test_cluster_deploy_create(self): """ Verify creating a deploy deploy. """ # Process is patched because we don't want to exec the subprocess # during unittesting with mock.patch('cherrypy.engine.publish') as _publish, \ mock.patch('commissaire.handlers.clusters.Process'): manager = mock.MagicMock(StoreHandlerManager) _publish.return_value = [manager] manager.get.side_effect = ( make_new(CLUSTER_WITH_FLAT_HOST), Exception, MagicMock(StoreHandlerManager), make_new(CLUSTER_DEPLOY)) # Verify with creation body = self.simulate_request( '/api/v0/cluster/development/deploy', method='PUT', body=json.dumps({'version': '1.0'})) self.assertEquals(falcon.HTTP_201, self.srmock.status) result = json.loads(body[0]) self.assertEquals('in_process', result['status']) self.assertEquals([], result['deployed']) self.assertEquals([], result['in_process'])
def on_post(self, req, resp, design_id): ingester_name = req.params.get('ingester', None) if ingester_name is None: self.error( None, "DesignsPartsResource POST requires parameter 'ingester'") self.return_error( resp, falcon.HTTP_400, message="POST requires parameter 'ingester'", retry=False) else: try: raw_body = req.stream.read(req.content_length or 0) if raw_body is not None and len(raw_body) > 0: parsed_items = self.ingester.ingest_data( plugin_name=ingester_name, design_state=self.state_manager, content=raw_body, design_id=design_id, context=req.context) resp.status = falcon.HTTP_201 resp.body = json.dumps( [x.obj_to_simple() for x in parsed_items]) else: self.return_error( resp, falcon.HTTP_400, message="Empty body not supported", retry=False) except ValueError: self.return_error( resp, falcon.HTTP_500, message="Error processing input", retry=False) except LookupError: self.return_error( resp, falcon.HTTP_400, message="Ingester %s not registered" % ingester_name, retry=False)
def test_create_task(self, mocker, blank_state): mocker.patch('oslo_policy.policy.Enforcer') ingester = mocker.MagicMock() orch = mocker.MagicMock( spec=Orchestrator, wraps=Orchestrator(state_manager=blank_state, ingester=ingester)) ctx = DrydockRequestContext() policy_engine = policy.DrydockPolicy() json_body = json.dumps({ 'action': 'verify_site', 'design_ref': 'http://foo.com', }).encode('utf-8') # Mock policy enforcement policy_mock_config = {'authorize.return_value': True} policy_engine.enforcer.configure_mock(**policy_mock_config) api = TasksResource(orchestrator=orch, state_manager=blank_state) # Configure context project_id = str(uuid.uuid4().hex) ctx.project_id = project_id user_id = str(uuid.uuid4().hex) ctx.user_id = user_id ctx.roles = ['admin'] ctx.set_policy_engine(policy_engine) # Configure mocked request and response req = mocker.MagicMock(spec=falcon.Request) req.content_type = 'application/json' req.stream.read.return_value = json_body resp = mocker.MagicMock(spec=falcon.Response) req.context = ctx api.on_post(req, resp) assert resp.status == falcon.HTTP_201 assert resp.get_header('Location') is not None
def test_host_create(self): """ Verify creation of a Host. """ with mock.patch('cherrypy.engine.publish') as _publish: manager = mock.MagicMock(StoreHandlerManager) _publish.return_value = [manager] manager.save.return_value = make_new(HOST) test_cluster = make_new(CLUSTER) manager.get.side_effect = ( Exception, test_cluster, make_new(HOST), test_cluster, test_cluster) data = ('{"ssh_priv_key": "dGVzdAo=", "remote_user": "root",' ' "cluster": "cluster"}') body = self.simulate_request( '/api/v0/host/10.2.0.2', method='PUT', body=data) self.assertEqual(self.srmock.status, falcon.HTTP_201) self.assertEqual( json.loads(INITIAL_HOST_JSON), json.loads(body[0])) # Make sure creation fails if the cluster doesn't exist manager.get.side_effect = Exception body = self.simulate_request( '/api/v0/host/10.2.0.2', method='PUT', body=data) self.assertEqual(self.srmock.status, falcon.HTTP_409) self.assertEqual({}, json.loads(body[0])) # Make sure creation is idempotent if the request parameters # agree with an existing host. manager.get.side_effect = ( make_new(HOST), test_cluster) manager.get.side_effect = ( make_new(HOST), make_new(CLUSTER_WITH_FLAT_HOST)) body = self.simulate_request( '/api/v0/host/10.2.0.2', method='PUT', body=data) # datasource's set should not have been called self.assertEqual(self.srmock.status, falcon.HTTP_200) self.assertEqual(json.loads(HOST_JSON), json.loads(body[0])) # Make sure creation fails if the request parameters conflict # with an existing host. manager.get.side_effect = ( make_new(HOST), make_new(CLUSTER_WITH_HOST)) bad_data = '{"ssh_priv_key": "boguskey"}' body = self.simulate_request( '/api/v0/host/10.2.0.2', method='PUT', body=bad_data) # datasource's set should not have been called once self.assertEqual({}, json.loads(body[0])) self.assertEqual(self.srmock.status, falcon.HTTP_409)
def test_network_create(self): """ Verify creating a network. """ with mock.patch('cherrypy.engine.publish') as _publish: manager = mock.MagicMock(StoreHandlerManager) _publish.return_value = [manager] manager.list_store_handlers.return_value = [[ EtcdStoreHandler, None, None]] test_network = networks.Network.new(name='default') # Verify with creation manager.get.side_effect = ( Exception, test_network, test_network, test_network ) test_body = ( '{"name": "default", "type": "flannel_etcd", "options": {}}') body = self.simulate_request( '/api/v0/network/default', method='PUT', body=test_body) self.assertEquals(falcon.HTTP_201, self.srmock.status) result = json.loads(body[0]) self.assertEquals('default', result['name']) self.assertEquals('flannel_etcd', result['type']) self.assertEquals({}, result['options']) # Verify with existing network manager.get.return_value = CLUSTER body = self.simulate_request( '/api/v0/network/default', method='PUT', body=test_body) self.assertEquals(falcon.HTTP_201, self.srmock.status) self.assertEquals('default', result['name']) self.assertEquals('flannel_etcd', result['type']) self.assertEquals({}, result['options']) # Verify failure with flannel_etcd is requests but there is no # etcd backend manager.list_store_handlers.return_value = [] body = self.simulate_request( '/api/v0/network/default', method='PUT', body=test_body) self.assertEquals(falcon.HTTP_409, self.srmock.status)
def on_put(self, req, resp, name): """ Handles PUT (or "initiate") requests for a Cluster restart. :param req: Request instance that will be passed through. :type req: falcon.Request :param resp: Response instance that will be passed through. :type resp: falcon.Response :param name: The name of the Cluster being restarted. :type name: str """ # Make sure the cluster name is valid. if not util.etcd_cluster_exists(name): self.logger.info( 'Restart PUT requested for nonexistent cluster {0}'.format( name)) resp.status = falcon.HTTP_404 return # If the operation is already in progress, return the current # status with response code 200 OK. try: store_manager = cherrypy.engine.publish('get-store-manager')[0] cluster_restart = store_manager.get(ClusterRestart.new(name=name)) self.logger.debug('Found a ClusterRestart for {0}'.format(name)) if not cluster_restart.finished_at: self.logger.debug( 'Cluster {0} restart already in progress'.format(name)) resp.status = falcon.HTTP_200 req.context['model'] = cluster_restart return except: # This means one doesn't already exist pass # TODO: Move to a poll? store_manager = cherrypy.engine.publish('get-store-manager')[0] args = (store_manager.clone(), name, 'restart') p = Process(target=clusterexec, args=args) p.start() self.logger.debug('Started restart in clusterexecpool for {0}'.format( name)) cluster_restart = ClusterRestart.new( name=name, status='in_process', started_at=datetime.datetime.utcnow().isoformat() ) store_manager = cherrypy.engine.publish('get-store-manager')[0] store_manager.save(cluster_restart) resp.status = falcon.HTTP_201 req.context['model'] = cluster_restart
def on_put(self, req, resp, name): """ Handles PUT (or "initiate") requests for a Cluster upgrade. :param req: Request instance that will be passed through. :type req: falcon.Request :param resp: Response instance that will be passed through. :type resp: falcon.Response :param name: The name of the Cluster being upgraded. :type name: str """ # Make sure the cluster name is valid. if not util.etcd_cluster_exists(name): self.logger.info( 'Upgrade PUT requested for nonexistent cluster {0}'.format( name)) resp.status = falcon.HTTP_404 return # If the operation is already in progress, return the current # status with response code 200 OK. try: store_manager = cherrypy.engine.publish('get-store-manager')[0] cluster_upgrade = store_manager.get(ClusterUpgrade.new(name=name)) self.logger.debug('Found ClusterUpgrade for {0}'.format(name)) if not cluster_upgrade.finished_at: self.logger.debug( 'Cluster {0} upgrade already in progress'.format(name)) resp.status = falcon.HTTP_200 req.context['model'] = cluster_upgrade return except: # This means one doesn't already exist. pass # TODO: Move to a poll? store_manager = cherrypy.engine.publish('get-store-manager')[0] args = (store_manager.clone(), name, 'upgrade') p = Process(target=clusterexec, args=args) p.start() self.logger.debug('Started upgrade in clusterexecpool for {0}'.format( name)) cluster_upgrade = ClusterUpgrade.new( name=name, status='in_process', started_at=datetime.datetime.utcnow().isoformat() ) store_manager = cherrypy.engine.publish('get-store-manager')[0] store_manager.save(cluster_upgrade) resp.status = falcon.HTTP_201 req.context['model'] = cluster_upgrade