我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用falcon.HTTP_404。
def test_get_http_status(self): assert falcon.get_http_status(404) == falcon.HTTP_404 assert falcon.get_http_status(404.3) == falcon.HTTP_404 assert falcon.get_http_status('404.3') == falcon.HTTP_404 assert falcon.get_http_status(404.9) == falcon.HTTP_404 assert falcon.get_http_status('404') == falcon.HTTP_404 assert falcon.get_http_status(123) == '123 Unknown' with pytest.raises(ValueError): falcon.get_http_status('not_a_number') with pytest.raises(ValueError): falcon.get_http_status(0) with pytest.raises(ValueError): falcon.get_http_status(0) with pytest.raises(ValueError): falcon.get_http_status(99) with pytest.raises(ValueError): falcon.get_http_status(-404.3) with pytest.raises(ValueError): falcon.get_http_status('-404') with pytest.raises(ValueError): falcon.get_http_status('-404.3') assert falcon.get_http_status(123, 'Go Away') == '123 Go Away'
def get_album_art(song_id: hug.types.text, response=None): try: api = music_api_names['offline_api'] except KeyError: response.status = falcon.HTTP_400 return "Not in offline mode" db = sqlite3.connect(api._db_path) try: cursor = db.execute("SELECT albumArt FROM albumArts WHERE songId=?", [song_id]) result = cursor.fetchone() if not result: response.status = falcon.HTTP_404 return None return BytesIO(result[0]) finally: db.close()
def on_get(self, req, resp, design_id): """Method Handler for GET design singleton. :param req: Falcon request object :param resp: Falcon response object :param design_id: UUID of the design resource """ source = req.params.get('source', 'designed') try: design = None if source == 'compiled': design = self.orchestrator.get_effective_site(design_id) elif source == 'designed': design = self.orchestrator.get_described_site(design_id) resp.body = json.dumps(design.obj_to_simple()) except errors.DesignError: self.error(req.context, "Design %s not found" % design_id) self.return_error( resp, falcon.HTTP_404, message="Design %s not found" % design_id, retry=False)
def on_get(self, req, resp, task_id): """Handler for GET method.""" try: task = self.state_manager.get_task(uuid.UUID(task_id)) if task is None: self.info(req.context, "Task %s does not exist" % task_id) self.return_error( resp, falcon.HTTP_404, message="Task %s does not exist" % task_id, retry=False) return resp.body = json.dumps(task.to_dict()) resp.status = falcon.HTTP_200 except Exception as ex: self.error(req.context, "Unknown error: %s" % (str(ex))) self.return_error( resp, falcon.HTTP_500, message="Unknown error", retry=False)
def on_get(self, req, resp): q = req.get_param('q') resp.content_type = 'text/plain; charset=utf-8' if not q: resp.body = 'Bad Request' resp.status = falcon.HTTP_400 return try: zenline = zenlines[q] except KeyError: resp.body = 'Not Found' resp.status = falcon.HTTP_404 return resp.body = zenline resp.status = falcon.HTTP_200
def on_get(self, req, resp, project=None, repo=None): if project: if not self.db.check_project_exists(project): resp.body = json.dumps({'Error': 'Project key {} not found!'.format(project)}) resp.status = falcon.HTTP_404 return latest = req.get_param_as_bool(name='latest', required=False) weeks_ago = req.get_param_as_int(name='weeks_ago', required=False) target_timestamp = get_epoch_time_of_weeks_ago(weeks=weeks_ago) if project and repo: results = self.db.get_results_for_repo(reponame=repo, timestamp=target_timestamp) elif project: if latest: results = self.db.get_latest_results_for_project(project=project) else: results = self.db.get_results_for_project(project=project, timestamp=target_timestamp) else: if latest: results = self.db.get_latest_results() else: results = self.db.get_all_results(timestamp=target_timestamp) resp.body = json.dumps({'results': format_results(results)}) resp.status = falcon.HTTP_200
def test_cluster_delete(self): """ Verify deleting a cluster. """ with mock.patch('cherrypy.engine.publish') as _publish: manager = mock.MagicMock(StoreHandlerManager) _publish.return_value = [manager] # Verify with proper deletion manager.get.return_value = MagicMock() body = self.simulate_request( '/api/v0/cluster/development', method='DELETE') # Get is called to verify cluster exists self.assertEquals(falcon.HTTP_200, self.srmock.status) self.assertEquals('{}', body[0]) # Verify when key doesn't exist manager.delete.side_effect = etcd.EtcdKeyNotFound body = self.simulate_request( '/api/v0/cluster/development', method='DELETE') self.assertEquals(falcon.HTTP_404, self.srmock.status) self.assertEquals('{}', body[0])
def test_cluster_hosts_retrieve(self): """ Verify retrieving a cluster host list. """ with mock.patch('cherrypy.engine.publish') as _publish: manager = mock.MagicMock(StoreHandlerManager) _publish.return_value = [manager] # Verify if the cluster exists the host list is returned manager.get.return_value = make_new(CLUSTER_WITH_FLAT_HOST) body = self.simulate_request('/api/v0/cluster/cluster/hosts') self.assertEqual(falcon.HTTP_200, self.srmock.status) self.assertEqual( ['10.2.0.2'], json.loads(body[0])) # Verify bad cluster name returns the proper result manager.get.side_effect = Exception body = self.simulate_request('/api/v0/cluster/bogus/hosts') self.assertEqual(falcon.HTTP_404, self.srmock.status) self.assertEqual({}, json.loads(body[0]))
def test_cluster_host_insert(self): """ Verify insertion of host in a cluster. """ with mock.patch('cherrypy.engine.publish') as _publish: manager = mock.MagicMock(StoreHandlerManager) _publish.return_value = [manager] # Verify inserting host returns the proper result manager.get.return_value = make_new(CLUSTER_WITH_FLAT_HOST) body = self.simulate_request( '/api/v0/cluster/developent/hosts/10.2.0.3', method='PUT') self.assertEqual(falcon.HTTP_200, self.srmock.status) self.assertEqual({}, json.loads(body[0])) # Verify bad cluster name returns the proper result manager.get.side_effect = Exception body = self.simulate_request( '/api/v0/cluster/bogus/hosts/10.2.0.3', method='PUT') self.assertEqual(falcon.HTTP_404, self.srmock.status) self.assertEqual({}, json.loads(body[0]))
def test_cluster_host_delete(self): """ Verify deletion of host in a cluster. """ with mock.patch('cherrypy.engine.publish') as _publish: manager = mock.MagicMock(StoreHandlerManager) _publish.return_value = [manager] # Verify deleting host returns the proper result manager.get.return_value = make_new(CLUSTER_WITH_FLAT_HOST) body = self.simulate_request( '/api/v0/cluster/development/hosts/10.2.0.2', method='DELETE') self.assertEqual(falcon.HTTP_200, self.srmock.status) self.assertEqual({}, json.loads(body[0])) # Verify bad cluster name returns the proper result manager.get.side_effect = Exception body = self.simulate_request( '/api/v0/cluster/bogus/hosts/10.2.0.2', method='DELETE') self.assertEqual(falcon.HTTP_404, self.srmock.status) self.assertEqual({}, json.loads(body[0]))
def test_host_creds_retrieve(self): """ Verify retrieving Host Credentials. """ with mock.patch('cherrypy.engine.publish') as _publish: manager = mock.MagicMock(StoreHandlerManager) _publish.return_value = [manager] # Verify if the host exists the data is returned manager.get.return_value = make_new(HOST) body = self.simulate_request('/api/v0/host/10.2.0.2/creds') # datasource's get should have been called once self.assertEqual(self.srmock.status, falcon.HTTP_200) self.assertEqual( json.loads(HOST_CREDS_JSON), json.loads(body[0])) # Verify no host returns the proper result manager.reset_mock() manager.get.side_effect = Exception body = self.simulate_request('/api/v0/host/10.9.9.9/creds') self.assertEqual(self.srmock.status, falcon.HTTP_404) self.assertEqual({}, json.loads(body[0]))
def test_network_delete(self): """ Verify deleting a network. """ with mock.patch('cherrypy.engine.publish') as _publish: manager = mock.MagicMock(StoreHandlerManager) _publish.return_value = [manager] # Verify with proper deletion manager.get.return_value = MagicMock() body = self.simulate_request( '/api/v0/network/development', method='DELETE') # Get is called to verify network exists self.assertEquals(falcon.HTTP_200, self.srmock.status) self.assertEquals('{}', body[0]) # Verify when key doesn't exist manager.delete.side_effect = etcd.EtcdKeyNotFound body = self.simulate_request( '/api/v0/network/development', method='DELETE') self.assertEquals(falcon.HTTP_404, self.srmock.status) self.assertEquals('{}', body[0])
def on_get(self, req, resp, name): """ Handles GET requests for Cluster hosts. :param req: Request instance that will be passed through. :type req: falcon.Request :param resp: Response instance that will be passed through. :type resp: falcon.Response :param name: The name of the Cluster being requested. :type name: str """ try: store_manager = cherrypy.engine.publish('get-store-manager')[0] cluster = store_manager.get(Cluster.new(name=name)) except: resp.status = falcon.HTTP_404 return resp.body = json.dumps(cluster.hostset) resp.status = falcon.HTTP_200
def on_get(self, req, resp): """ Handles GET requests for Hosts. :param req: Request instance that will be passed through. :type req: falcon.Request :param resp: Response instance that will be passed through. :type resp: falcon.Response """ try: store_manager = cherrypy.engine.publish('get-store-manager')[0] hosts = store_manager.list(Hosts(hosts=[])) if len(hosts.hosts) == 0: raise Exception() resp.status = falcon.HTTP_200 req.context['model'] = hosts except Exception: # This was originally a "no content" but I think a 404 makes # more sense if there are no hosts self.logger.warn( 'Store does not have any hosts. Returning [] and 404.') resp.status = falcon.HTTP_404 req.context['model'] = None return
def on_get(self, req, resp): """ Handles GET requests for Networks. :param req: Request instance that will be passed through. :type req: falcon.Request :param resp: Response instance that will be passed through. :type resp: falcon.Response """ try: store_manager = cherrypy.engine.publish('get-store-manager')[0] networks = store_manager.list(Networks(networks=[])) if len(networks.networks) == 0: raise Exception() resp.status = falcon.HTTP_200 resp.body = json.dumps([ network.name for network in networks.networks]) except Exception: self.logger.warn( 'Store does not have any networks. Returning [] and 404.') resp.status = falcon.HTTP_404 req.context['model'] = None return
def on_get(self, req, resp, name): """ Handles retrieval of an existing Network. :param req: Request instance that will be passed through. :type req: falcon.Request :param resp: Response instance that will be passed through. :type resp: falcon.Response :param name: The friendly name of the network. :type address: str """ try: store_manager = cherrypy.engine.publish('get-store-manager')[0] network = store_manager.get(Network.new(name=name)) resp.status = falcon.HTTP_200 req.context['model'] = network except: resp.status = falcon.HTTP_404 return
def test_second_hook_raise_HTTPNotFound(self): bad_headers = { 'Content-Type': 'application/json', 'URL-METHODS': 'GET, PUT', } result = self.simulate_post(headers=bad_headers, body=self.body) self.assertEqual(falcon.HTTP_404, result.status)
def test_wrong_router(self): result = self.simulate_get(path='/some/wrong/path', body=self.body) self.assertEqual(falcon.HTTP_404, result.status)
def test_skip_process_resource(self): global context app = falcon.API(middleware=[RequestTimeMiddleware()]) app.add_route('/', MiddlewareClassResource()) client = testing.TestClient(app) response = client.simulate_request(path='/404') assert response.status == falcon.HTTP_404 assert 'start_time' in context assert 'mid_time' not in context assert 'end_time' in context assert not context['req_succeeded']
def test_404_without_body(self, client): client.app.add_route('/404', NotFoundResource()) response = client.simulate_request(path='/404') assert response.status == falcon.HTTP_404 assert not response.content
def test_404_with_body(self, client): client.app.add_route('/404', NotFoundResourceWithBody()) response = client.simulate_request(path='/404') assert response.status == falcon.HTTP_404 assert response.content expected_body = { u'title': u'404 Not Found', u'description': u'Not Found' } assert response.json == expected_body
def on_get(self, req, resp, design_id, kind, name): source = req.params.get('source', 'designed') try: design = None if source == 'compiled': design = self.orchestrator.get_effective_site(design_id) elif source == 'designed': design = self.orchestrator.get_described_site(design_id) part = None if kind == 'Site': part = design.get_site() elif kind == 'Network': part = design.get_network(name) elif kind == 'NetworkLink': part = design.get_network_link(name) elif kind == 'HardwareProfile': part = design.get_hardware_profile(name) elif kind == 'HostProfile': part = design.get_host_profile(name) elif kind == 'BaremetalNode': part = design.get_baremetal_node(name) else: self.error(req.context, "Kind %s unknown" % kind) self.return_error( resp, falcon.HTTP_404, message="Kind %s unknown" % kind, retry=False) return resp.body = json.dumps(part.obj_to_simple()) except errors.DesignError as dex: self.error(req.context, str(dex)) self.return_error( resp, falcon.HTTP_404, message=str(dex), retry=False) except Exception as exc: self.error(req.context, str(exc)) self.return_error( resp.falcon.HTTP_500, message=str(exc), retry=False)
def not_found(message="The requested resource does not exist"): raise falcon.HTTPNotFound(description=message, code=falcon.HTTP_404)
def on_get(self, req, resp, account_id): """Handles GET requests""" watched = Actions.get_watched(account_id) if watched is None: resp.status = falcon.HTTP_404 return servers = Actions.get_servers(account_id) resp.status = falcon.HTTP_200 # This is the default status json_resp = {'account_id': account_id, 'watched': watched, 'servers': servers} resp.body = json.dumps(json_resp)
def on_get(self, req, resp, server_id): """Handles GET requests""" accounts = Actions.get_accounts(server_id) if accounts is None: resp.status = falcon.HTTP_404 return json_response = {'server_id': server_id, 'accounts': accounts} resp.status = falcon.HTTP_200 # This is the default status resp.body = json.dumps(json_response)
def on_put(self, req, resp, server_id): """Handles PUT requests""" resp.status = falcon.HTTP_404 # Disabled! return try: raw_json = req.stream.read() except Exception as ex: raise falcon.HTTPError(falcon.HTTP_400, 'Error', ex.message) try: result_json = json.loads(raw_json, encoding='utf-8') except ValueError: raise falcon.HTTPError(falcon.HTTP_400, 'Malformed JSON', 'Could not decode the request body. The ' 'JSON was incorrect.') accounts = Actions.get_accounts(server_id) if accounts is None: resp.status = falcon.HTTP_404 return accounts = result_json['accounts'] for account_id in accounts: Actions.add_account(server_id, account_id) account.Actions.add_server(account_id, server_id) resp.status = falcon.HTTP_200 # This is the default status jsonresp = {'server_id': server_id, 'accounts': Actions.get_accounts(server_id)} resp.body = json.dumps(jsonresp)
def on_get(self, req, resp, project=None): if project: if not self.db.check_project_exists(project): resp.body = json.dumps({'Error': 'Project key {} not found!'.format(project)}) resp.status = falcon.HTTP_404 return if project: repos = self.db.get_repos_for_project(project) body = {'repositories': repos} else: projects = self.db.get_projects() body = {'projects': projects} resp.body = json.dumps(body) resp.status = falcon.HTTP_200
def test_clusters_listing_with_no_etcd_result(self): """ Verify listing Clusters handles no etcd result properly. """ with mock.patch('cherrypy.engine.publish') as _publish: _publish.return_value = [[[], etcd.EtcdKeyNotFound()]] body = self.simulate_request('/api/v0/clusters') self.assertEqual(self.srmock.status, falcon.HTTP_404) self.assertEqual('{}', body[0])
def test_cluster_retrieve(self): """ Verify retrieving a cluster. """ with mock.patch('cherrypy.engine.publish') as _publish: manager = mock.MagicMock(StoreHandlerManager) _publish.return_value = [manager] test_cluster = make_new(CLUSTER_WITH_HOST) # Verify if the cluster exists the data is returned manager.get.return_value = test_cluster manager.list.return_value = make_new(HOSTS) body = self.simulate_request('/api/v0/cluster/development') self.assertEqual(self.srmock.status, falcon.HTTP_200) self.assertEqual( json.loads(test_cluster.to_json_with_hosts()), json.loads(body[0])) # Verify no cluster returns the proper result manager.get.reset_mock() manager.get.side_effect = Exception body = self.simulate_request('/api/v0/cluster/bogus') self.assertEqual(falcon.HTTP_404, self.srmock.status) self.assertEqual({}, json.loads(body[0]))
def test_hosts_listing_with_no_hosts(self): """ Verify listing Hosts when no hosts exists. """ with mock.patch('cherrypy.engine.publish') as _publish: _publish.return_value = Hosts(hosts=[]) body = self.simulate_request('/api/v0/hosts') # datasource's get should have been called once self.assertEqual(self.srmock.status, falcon.HTTP_404) self.assertEqual({}, json.loads(body[0]))
def test_hosts_listing_with_no_etcd_result(self): """ Verify listing hosts handles no etcd result properly. """ with mock.patch('cherrypy.engine.publish') as _publish: _publish.return_value = [[[], Exception]] body = self.simulate_request('/api/v0/hosts') # datasource's get should have been called once self.assertEqual(self.srmock.status, falcon.HTTP_404) self.assertEqual('{}', body[0])
def test_host_delete(self): """ Verify deleting a Host. """ with mock.patch('cherrypy.engine.publish') as _publish: manager = mock.MagicMock(StoreHandlerManager) _publish.return_value = [manager] # Verify if the host exists the data is returned manager.get.return_value = make_new(HOST) # Verify deleting of an existing host works body = self.simulate_request( '/api/v0/host/10.2.0.2', method='DELETE') # datasource's delete should have been called once self.assertEquals(1, manager.delete.call_count) self.assertEqual(self.srmock.status, falcon.HTTP_200) self.assertEqual({}, json.loads(body[0])) # Verify deleting of a non existing host returns the proper result manager.reset_mock() manager.delete.side_effect = Exception body = self.simulate_request( '/api/v0/host/10.9.9.9', method='DELETE') self.assertEquals(1, manager.delete.call_count) self.assertEqual(self.srmock.status, falcon.HTTP_404) self.assertEqual({}, json.loads(body[0]))
def test_networks_listing_with_no_networks(self): """ Verify listing Networks when no networks exist. """ with mock.patch('cherrypy.engine.publish') as _publish: return_value = networks.Networks(networks=[]) manager = mock.MagicMock(StoreHandlerManager) manager.list.return_value = return_value _publish.return_value = [manager] body = self.simulate_request('/api/v0/networks') self.assertEqual(self.srmock.status, falcon.HTTP_404) self.assertEqual({}, json.loads(body[0]))
def test_networks_listing_with_no_etcd_result(self): """ Verify listing Networks handles no etcd result properly. """ with mock.patch('cherrypy.engine.publish') as _publish: _publish.return_value = [[[], etcd.EtcdKeyNotFound()]] body = self.simulate_request('/api/v0/networks') self.assertEqual(self.srmock.status, falcon.HTTP_404) self.assertEqual('{}', body[0])
def test_network_retrieve(self): """ Verify retrieving a network. """ with mock.patch('cherrypy.engine.publish') as _publish: manager = mock.MagicMock(StoreHandlerManager) _publish.return_value = [manager] test_network = networks.Network.new(name='default') # Verify if the network exists the data is returned manager.get.return_value = test_network body = self.simulate_request('/api/v0/network/default') self.assertEqual(self.srmock.status, falcon.HTTP_200) self.assertEqual( json.loads(test_network.to_json()), json.loads(body[0])) # Verify no network returns the proper result manager.get.reset_mock() manager.get.side_effect = Exception body = self.simulate_request('/api/v0/network/bogus') self.assertEqual(falcon.HTTP_404, self.srmock.status) self.assertEqual({}, json.loads(body[0]))
def on_get(self, req, resp, name): """ Handles retrieval of an existing Cluster. :param req: Request instance that will be passed through. :type req: falcon.Request :param resp: Response instance that will be passed through. :type resp: falcon.Response :param name: The name of the Cluster being requested. :type name: str """ try: store_manager = cherrypy.engine.publish('get-store-manager')[0] cluster = store_manager.get(Cluster.new(name=name)) except Exception as error: self.logger.error("{0}: {1}".format(type(error), error)) resp.status = falcon.HTTP_404 return if not cluster: resp.status = falcon.HTTP_404 return self._calculate_hosts(cluster) # Have to set resp.body explicitly to include Hosts. resp.body = cluster.to_json_with_hosts() resp.status = falcon.HTTP_200 self.logger.debug('Cluster retrieval: {0}'.format(resp.body))
def on_get(self, req, resp, name, address): """ Handles GET requests for individual hosts in a Cluster. This is a membership test, returning 200 OK if the host address is part of the cluster, or else 404 Not Found. :param req: Request instance that will be passed through. :type req: falcon.Request :param resp: Response instance that will be passed through. :type resp: falcon.Response :param name: The name of the Cluster being requested. :type name: str :param address: The address of the Host being requested. :type address: str """ try: store_manager = cherrypy.engine.publish('get-store-manager')[0] cluster = store_manager.get(Cluster.new(name=name)) except: resp.status = falcon.HTTP_404 return if address in cluster.hostset: resp.status = falcon.HTTP_200 else: resp.status = falcon.HTTP_404
def on_get(self, req, resp, name): """ Handles GET (or "status") requests for a tree image deployment across a Cluster. :param req: Request instance that will be passed through. :type req: falcon.Request :param resp: Response instance that will be passed through. :type resp: falcon.Response :param name: The name of the Cluster undergoing deployment. :type name: str """ if not util.etcd_cluster_exists(name): self.logger.info( 'Deploy GET requested for nonexistent cluster {0}'.format( name)) resp.status = falcon.HTTP_404 return try: store_manager = cherrypy.engine.publish('get-store-manager')[0] cluster_deploy = store_manager.get(ClusterDeploy.new(name=name)) self.logger.debug('Found ClusterDeploy for {0}'.format(name)) except: # Return "204 No Content" if we have no status, # meaning no deployment is in progress. The client # can't be expected to know that, so it's not a # client error (4xx). self.logger.debug(( 'Deploy GET requested for {0} but no deployment ' 'has ever been executed.').format(name)) resp.status = falcon.HTTP_204 return resp.status = falcon.HTTP_200 req.context['model'] = cluster_deploy
def on_get(self, req, resp, name): """ Handles GET (or "status") requests for a Cluster upgrade. :param req: Request instance that will be passed through. :type req: falcon.Request :param resp: Response instance that will be passed through. :type resp: falcon.Response :param name: The name of the Cluster being upgraded. :type name: str """ if not util.etcd_cluster_exists(name): self.logger.info( 'Upgrade GET requested for nonexistent cluster {0}'.format( name)) resp.status = falcon.HTTP_404 return try: store_manager = cherrypy.engine.publish('get-store-manager')[0] cluster_upgrade = store_manager.get(ClusterUpgrade.new(name=name)) self.logger.debug('Found ClusterUpgrade for {0}'.format(name)) except: # Return "204 No Content" if we have no status, # meaning no upgrade is in progress. The client # can't be expected to know that, so it's not a # client error (4xx). self.logger.debug(( 'Upgrade GET requested for {0} but no upgrade ' 'has ever been executed.').format(name)) resp.status = falcon.HTTP_204 return resp.status = falcon.HTTP_200 req.context['model'] = cluster_upgrade
def on_get(self, req, resp, address): """ Handles retrieval of existing Host credentials. :param req: Request instance that will be passed through. :type req: falcon.Request :param resp: Response instance that will be passed through. :type resp: falcon.Response :param address: The address of the Host being requested. :type address: str """ # TODO: Verify input # TODO: Decide if this should be a model or if it makes sense to # stay a subset off of Host and bypass the req.context # middleware system. try: store_manager = cherrypy.engine.publish('get-store-manager')[0] host = store_manager.get(Host.new(address=address)) resp.status = falcon.HTTP_200 body = { 'ssh_priv_key': host.ssh_priv_key, 'remote_user': host.remote_user or 'root', } resp.body = json.dumps(body) except: resp.status = falcon.HTTP_404 return
def sink(req, resp): do_log_history(req, resp) paths = filter(lambda x: x != '', req.path.split('/')) ctrl_name = func_name = 'index' if len(paths) >= 2: ctrl_name = paths[0] func_name = paths[1] elif len(paths) == 1: func_name = paths[0] ctrl = loader.ctrl(ctrl_name) if ctrl == None or not hasattr(ctrl, func_name): resp.status = falcon.HTTP_404 resp.body = "Not Found" else: try: content = getattr(ctrl, func_name)(req, resp) if resp.body == None: if isinstance(content, unicode): resp.body = unicode.encode(content, 'utf-8', 'ignore') elif isinstance(content, str): resp.body = content else: resp.body = json.dumps(content) except Exception as ex: log_error(ex) resp.status = falcon.HTTP_500 resp.body = str(ex) #resp.body = 'A server error occurred. Please contact the administrator' do_log_result(req, resp)
def test_custom_router_find_should_be_used(): def resource(req, resp, **kwargs): resp.body = '{{"uri_template": "{0}"}}'.format(req.uri_template) class CustomRouter(object): def __init__(self): self.reached_backwards_compat = False def find(self, uri): if uri == '/test/42': return resource, {'GET': resource}, {}, '/test/{id}' if uri == '/test/42/no-uri-template': return resource, {'GET': resource}, {}, None if uri == '/test/42/uri-template/backwards-compat': return resource, {'GET': resource}, {} if uri == '/404/backwards-compat': self.reached_backwards_compat = True return (None, None, None) return None router = CustomRouter() app = falcon.API(router=router) client = testing.TestClient(app) response = client.simulate_request(path='/test/42') assert response.content == b'{"uri_template": "/test/{id}"}' response = client.simulate_request(path='/test/42/no-uri-template') assert response.content == b'{"uri_template": "None"}' response = client.simulate_request(path='/test/42/uri-template/backwards-compat') assert response.content == b'{"uri_template": "None"}' for uri in ('/404', '/404/backwards-compat'): response = client.simulate_request(path=uri) assert not response.content assert response.status == falcon.HTTP_404 assert router.reached_backwards_compat
def on_get(self, req, resp, design_id): try: design = self.state_manager.get_design(design_id) except errors.DesignError: self.return_error( resp, falcon.HTTP_404, message="Design %s nout found" % design_id, retry=False) part_catalog = [] site = design.get_site() part_catalog.append({'kind': 'Region', 'key': site.get_id()}) part_catalog.extend([{ 'kind': 'Network', 'key': n.get_id() } for n in design.networks]) part_catalog.extend([{ 'kind': 'NetworkLink', 'key': l.get_id() } for l in design.network_links]) part_catalog.extend([{ 'kind': 'HostProfile', 'key': p.get_id() } for p in design.host_profiles]) part_catalog.extend([{ 'kind': 'HardwareProfile', 'key': p.get_id() } for p in design.hardware_profiles]) part_catalog.extend([{ 'kind': 'BaremetalNode', 'key': n.get_id() } for n in design.baremetal_nodes]) resp.body = json.dumps(part_catalog) resp.status = falcon.HTTP_200 return
def test_cluster_hosts_overwrite(self): """ Verify overwriting a cluster host list. """ with mock.patch('cherrypy.engine.publish') as _publish: manager = mock.MagicMock(StoreHandlerManager) _publish.return_value = [manager] # Verify setting host list works with a proper request manager.get.return_value = make_new(CLUSTER_WITH_FLAT_HOST) body = self.simulate_request( '/api/v0/cluster/development/hosts', method='PUT', body='{"old": ["10.2.0.2"], "new": ["10.2.0.2", "10.2.0.3"]}') self.assertEqual(falcon.HTTP_200, self.srmock.status) self.assertEqual({}, json.loads(body[0])) # Verify bad request (KeyError) returns the proper result manager.get.side_effect = KeyError body = self.simulate_request( '/api/v0/cluster/development/hosts', method='PUT', body='{"new": ["10.2.0.2", "10.2.0.3"]}') self.assertEqual(falcon.HTTP_400, self.srmock.status) self.assertEqual({}, json.loads(body[0])) # Verify bad request (TypeError) returns the proper result manager.get.side_effect = TypeError body = self.simulate_request( '/api/v0/cluster/development/hosts', method='PUT', body='["10.2.0.2", "10.2.0.3"]') self.assertEqual(falcon.HTTP_400, self.srmock.status) self.assertEqual({}, json.loads(body[0])) # Verify bad cluster name returns the proper result manager.get.side_effect = Exception body = self.simulate_request( '/api/v0/cluster/bogus/hosts', method='PUT', body='{"old": ["10.2.0.2"], "new": ["10.2.0.2", "10.2.0.3"]}') self.assertEqual(falcon.HTTP_404, self.srmock.status) self.assertEqual({}, json.loads(body[0])) # Verify host list conflict returns the proper result manager.get.side_effect = None body = self.simulate_request( '/api/v0/cluster/development/hosts', method='PUT', body='{"old": [], "new": ["10.2.0.2", "10.2.0.3"]}') self.assertEqual(falcon.HTTP_409, self.srmock.status) self.assertEqual({}, json.loads(body[0]))
def on_put(self, req, resp, name): """ Handles PUT requests for Cluster hosts. This replaces the entire host list for a Cluster. :param req: Request instance that will be passed through. :type req: falcon.Request :param resp: Response instance that will be passed through. :type resp: falcon.Response :param name: The name of the Cluster being requested. :type name: str """ try: req_body = json.loads(req.stream.read().decode()) old_hosts = set(req_body['old']) # Ensures no duplicates new_hosts = set(req_body['new']) # Ensures no duplicates except (KeyError, TypeError): self.logger.info( 'Bad client PUT request for cluster "{0}": {1}'. format(name, req_body)) resp.status = falcon.HTTP_400 return try: store_manager = cherrypy.engine.publish('get-store-manager')[0] cluster = store_manager.get(Cluster.new(name=name)) except: resp.status = falcon.HTTP_404 return # old_hosts must match current hosts to accept new_hosts. if old_hosts != set(cluster.hostset): self.logger.info( 'Conflict setting hosts for cluster {0}'.format(name)) self.logger.debug('{0} != {1}'.format(old_hosts, cluster.hostset)) resp.status = falcon.HTTP_409 return # FIXME: Need input validation. For each new host, # - Does the host exist at /commissaire/hosts/{IP}? # - Does the host already belong to another cluster? # FIXME: Should guard against races here, since we're fetching # the cluster record and writing it back with some parts # unmodified. Use either locking or a conditional write # with the etcd 'modifiedIndex'. Deferring for now. cluster.hostset = list(new_hosts) store_manager.save(cluster) resp.status = falcon.HTTP_200
def on_put(self, req, resp, name): """ Handles PUT (or "initiate") requests for a Cluster restart. :param req: Request instance that will be passed through. :type req: falcon.Request :param resp: Response instance that will be passed through. :type resp: falcon.Response :param name: The name of the Cluster being restarted. :type name: str """ # Make sure the cluster name is valid. if not util.etcd_cluster_exists(name): self.logger.info( 'Restart PUT requested for nonexistent cluster {0}'.format( name)) resp.status = falcon.HTTP_404 return # If the operation is already in progress, return the current # status with response code 200 OK. try: store_manager = cherrypy.engine.publish('get-store-manager')[0] cluster_restart = store_manager.get(ClusterRestart.new(name=name)) self.logger.debug('Found a ClusterRestart for {0}'.format(name)) if not cluster_restart.finished_at: self.logger.debug( 'Cluster {0} restart already in progress'.format(name)) resp.status = falcon.HTTP_200 req.context['model'] = cluster_restart return except: # This means one doesn't already exist pass # TODO: Move to a poll? store_manager = cherrypy.engine.publish('get-store-manager')[0] args = (store_manager.clone(), name, 'restart') p = Process(target=clusterexec, args=args) p.start() self.logger.debug('Started restart in clusterexecpool for {0}'.format( name)) cluster_restart = ClusterRestart.new( name=name, status='in_process', started_at=datetime.datetime.utcnow().isoformat() ) store_manager = cherrypy.engine.publish('get-store-manager')[0] store_manager.save(cluster_restart) resp.status = falcon.HTTP_201 req.context['model'] = cluster_restart
def on_put(self, req, resp, name): """ Handles PUT (or "initiate") requests for a Cluster upgrade. :param req: Request instance that will be passed through. :type req: falcon.Request :param resp: Response instance that will be passed through. :type resp: falcon.Response :param name: The name of the Cluster being upgraded. :type name: str """ # Make sure the cluster name is valid. if not util.etcd_cluster_exists(name): self.logger.info( 'Upgrade PUT requested for nonexistent cluster {0}'.format( name)) resp.status = falcon.HTTP_404 return # If the operation is already in progress, return the current # status with response code 200 OK. try: store_manager = cherrypy.engine.publish('get-store-manager')[0] cluster_upgrade = store_manager.get(ClusterUpgrade.new(name=name)) self.logger.debug('Found ClusterUpgrade for {0}'.format(name)) if not cluster_upgrade.finished_at: self.logger.debug( 'Cluster {0} upgrade already in progress'.format(name)) resp.status = falcon.HTTP_200 req.context['model'] = cluster_upgrade return except: # This means one doesn't already exist. pass # TODO: Move to a poll? store_manager = cherrypy.engine.publish('get-store-manager')[0] args = (store_manager.clone(), name, 'upgrade') p = Process(target=clusterexec, args=args) p.start() self.logger.debug('Started upgrade in clusterexecpool for {0}'.format( name)) cluster_upgrade = ClusterUpgrade.new( name=name, status='in_process', started_at=datetime.datetime.utcnow().isoformat() ) store_manager = cherrypy.engine.publish('get-store-manager')[0] store_manager.save(cluster_upgrade) resp.status = falcon.HTTP_201 req.context['model'] = cluster_upgrade
def on_delete(self, req, resp, address): """ Handles the Deletion of a Host. :param req: Request instance that will be passed through. :type req: falcon.Request :param resp: Response instance that will be passed through. :type resp: falcon.Response :param address: The address of the Host being requested. :type address: str """ resp.body = '{}' store_manager = cherrypy.engine.publish('get-store-manager')[0] try: host = Host.new(address=address) WATCHER_QUEUE.dequeue(host) store_manager.delete(host) self.logger.debug( 'Deleted host {0} and dequeued it from the watcher.'.format( host.address)) resp.status = falcon.HTTP_200 except: resp.status = falcon.HTTP_404 # Also remove the host from all clusters. # Note: We've done all we need to for the host deletion, # so if an error occurs from here just log it and # return. try: clusters = store_manager.list(Clusters(clusters=[])) except: self.logger.warn('Store does not have any clusters') return for cluster in clusters.clusters: try: self.logger.debug( 'Checking cluster {0}'.format(cluster.name)) if address in cluster.hostset: self.logger.info( 'Removing {0} from cluster {1}'.format( address, cluster.name)) cluster.hostset.remove(address) store_manager.save(cluster) self.logger.info( '{0} has been removed from cluster {1}'.format( address, cluster.name)) except: self.logger.warn( 'Failed to remove {0} from cluster {1}'.format( address, cluster.name))