Python falcon 模块,HTTP_404 实例源码

我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用falcon.HTTP_404

项目:deb-python-falcon    作者:openstack    | 项目源码 | 文件源码
def test_get_http_status(self):
        assert falcon.get_http_status(404) == falcon.HTTP_404
        assert falcon.get_http_status(404.3) == falcon.HTTP_404
        assert falcon.get_http_status('404.3') == falcon.HTTP_404
        assert falcon.get_http_status(404.9) == falcon.HTTP_404
        assert falcon.get_http_status('404') == falcon.HTTP_404
        assert falcon.get_http_status(123) == '123 Unknown'
        with pytest.raises(ValueError):
            falcon.get_http_status('not_a_number')
        with pytest.raises(ValueError):
            falcon.get_http_status(0)
        with pytest.raises(ValueError):
            falcon.get_http_status(0)
        with pytest.raises(ValueError):
            falcon.get_http_status(99)
        with pytest.raises(ValueError):
            falcon.get_http_status(-404.3)
        with pytest.raises(ValueError):
            falcon.get_http_status('-404')
        with pytest.raises(ValueError):
            falcon.get_http_status('-404.3')
        assert falcon.get_http_status(123, 'Go Away') == '123 Go Away'
项目:MusicBot    作者:BjoernPetersen    | 项目源码 | 文件源码
def get_album_art(song_id: hug.types.text, response=None):
    try:
        api = music_api_names['offline_api']
    except KeyError:
        response.status = falcon.HTTP_400
        return "Not in offline mode"
    db = sqlite3.connect(api._db_path)
    try:
        cursor = db.execute("SELECT albumArt FROM albumArts WHERE songId=?", [song_id])
        result = cursor.fetchone()
        if not result:
            response.status = falcon.HTTP_404
            return None

        return BytesIO(result[0])
    finally:
        db.close()
项目:drydock    作者:att-comdev    | 项目源码 | 文件源码
def on_get(self, req, resp, design_id):
        """Method Handler for GET design singleton.

        :param req: Falcon request object
        :param resp: Falcon response object
        :param design_id: UUID of the design resource
        """
        source = req.params.get('source', 'designed')

        try:
            design = None
            if source == 'compiled':
                design = self.orchestrator.get_effective_site(design_id)
            elif source == 'designed':
                design = self.orchestrator.get_described_site(design_id)

            resp.body = json.dumps(design.obj_to_simple())
        except errors.DesignError:
            self.error(req.context, "Design %s not found" % design_id)
            self.return_error(
                resp,
                falcon.HTTP_404,
                message="Design %s not found" % design_id,
                retry=False)
项目:drydock    作者:att-comdev    | 项目源码 | 文件源码
def on_get(self, req, resp, task_id):
        """Handler for GET method."""
        try:
            task = self.state_manager.get_task(uuid.UUID(task_id))

            if task is None:
                self.info(req.context, "Task %s does not exist" % task_id)
                self.return_error(
                    resp,
                    falcon.HTTP_404,
                    message="Task %s does not exist" % task_id,
                    retry=False)
                return

            resp.body = json.dumps(task.to_dict())
            resp.status = falcon.HTTP_200
        except Exception as ex:
            self.error(req.context, "Unknown error: %s" % (str(ex)))
            self.return_error(
                resp, falcon.HTTP_500, message="Unknown error", retry=False)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def on_get(self, req, resp):
        q = req.get_param('q')

        resp.content_type = 'text/plain; charset=utf-8'

        if not q:
            resp.body = 'Bad Request'
            resp.status = falcon.HTTP_400
            return

        try:
            zenline = zenlines[q]
        except KeyError:
            resp.body = 'Not Found'
            resp.status = falcon.HTTP_404
            return

        resp.body = zenline
        resp.status = falcon.HTTP_200
项目:djinn    作者:ordjinnization    | 项目源码 | 文件源码
def on_get(self, req, resp, project=None, repo=None):
        if project:
            if not self.db.check_project_exists(project):
                resp.body = json.dumps({'Error': 'Project key {} not found!'.format(project)})
                resp.status = falcon.HTTP_404
                return

        latest = req.get_param_as_bool(name='latest', required=False)
        weeks_ago = req.get_param_as_int(name='weeks_ago', required=False)
        target_timestamp = get_epoch_time_of_weeks_ago(weeks=weeks_ago)

        if project and repo:
            results = self.db.get_results_for_repo(reponame=repo, timestamp=target_timestamp)
        elif project:
            if latest:
                results = self.db.get_latest_results_for_project(project=project)
            else:
                results = self.db.get_results_for_project(project=project, timestamp=target_timestamp)
        else:
            if latest:
                results = self.db.get_latest_results()
            else:
                results = self.db.get_all_results(timestamp=target_timestamp)
        resp.body = json.dumps({'results': format_results(results)})
        resp.status = falcon.HTTP_200
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def test_cluster_delete(self):
        """
        Verify deleting a cluster.
        """
        with mock.patch('cherrypy.engine.publish') as _publish:
            manager = mock.MagicMock(StoreHandlerManager)
            _publish.return_value = [manager]

            # Verify with proper deletion
            manager.get.return_value = MagicMock()
            body = self.simulate_request(
                '/api/v0/cluster/development', method='DELETE')
            # Get is called to verify cluster exists
            self.assertEquals(falcon.HTTP_200, self.srmock.status)
            self.assertEquals('{}', body[0])

            # Verify when key doesn't exist
            manager.delete.side_effect = etcd.EtcdKeyNotFound
            body = self.simulate_request(
                '/api/v0/cluster/development', method='DELETE')
            self.assertEquals(falcon.HTTP_404, self.srmock.status)
            self.assertEquals('{}', body[0])
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def test_cluster_hosts_retrieve(self):
        """
        Verify retrieving a cluster host list.
        """
        with mock.patch('cherrypy.engine.publish') as _publish:
            manager = mock.MagicMock(StoreHandlerManager)
            _publish.return_value = [manager]

            # Verify if the cluster exists the host list is returned
            manager.get.return_value = make_new(CLUSTER_WITH_FLAT_HOST)
            body = self.simulate_request('/api/v0/cluster/cluster/hosts')
            self.assertEqual(falcon.HTTP_200, self.srmock.status)
            self.assertEqual(
                ['10.2.0.2'],
                json.loads(body[0]))

            # Verify bad cluster name returns the proper result
            manager.get.side_effect = Exception
            body = self.simulate_request('/api/v0/cluster/bogus/hosts')
            self.assertEqual(falcon.HTTP_404, self.srmock.status)
            self.assertEqual({}, json.loads(body[0]))
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def test_cluster_host_insert(self):
        """
        Verify insertion of host in a cluster.
        """
        with mock.patch('cherrypy.engine.publish') as _publish:
            manager = mock.MagicMock(StoreHandlerManager)
            _publish.return_value = [manager]

            # Verify inserting host returns the proper result
            manager.get.return_value = make_new(CLUSTER_WITH_FLAT_HOST)
            body = self.simulate_request(
                '/api/v0/cluster/developent/hosts/10.2.0.3', method='PUT')
            self.assertEqual(falcon.HTTP_200, self.srmock.status)
            self.assertEqual({}, json.loads(body[0]))

            # Verify bad cluster name returns the proper result
            manager.get.side_effect = Exception
            body = self.simulate_request(
                '/api/v0/cluster/bogus/hosts/10.2.0.3', method='PUT')
            self.assertEqual(falcon.HTTP_404, self.srmock.status)
            self.assertEqual({}, json.loads(body[0]))
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def test_cluster_host_delete(self):
        """
        Verify deletion of host in a cluster.
        """
        with mock.patch('cherrypy.engine.publish') as _publish:
            manager = mock.MagicMock(StoreHandlerManager)
            _publish.return_value = [manager]

            # Verify deleting host returns the proper result
            manager.get.return_value = make_new(CLUSTER_WITH_FLAT_HOST)
            body = self.simulate_request(
                '/api/v0/cluster/development/hosts/10.2.0.2', method='DELETE')
            self.assertEqual(falcon.HTTP_200, self.srmock.status)
            self.assertEqual({}, json.loads(body[0]))

            # Verify bad cluster name returns the proper result
            manager.get.side_effect = Exception
            body = self.simulate_request(
                '/api/v0/cluster/bogus/hosts/10.2.0.2', method='DELETE')
            self.assertEqual(falcon.HTTP_404, self.srmock.status)
            self.assertEqual({}, json.loads(body[0]))
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def test_host_creds_retrieve(self):
        """
        Verify retrieving Host Credentials.
        """
        with mock.patch('cherrypy.engine.publish') as _publish:
            manager = mock.MagicMock(StoreHandlerManager)
            _publish.return_value = [manager]

            # Verify if the host exists the data is returned
            manager.get.return_value = make_new(HOST)

            body = self.simulate_request('/api/v0/host/10.2.0.2/creds')
            # datasource's get should have been called once
            self.assertEqual(self.srmock.status, falcon.HTTP_200)
            self.assertEqual(
                json.loads(HOST_CREDS_JSON),
                json.loads(body[0]))

            # Verify no host returns the proper result
            manager.reset_mock()
            manager.get.side_effect = Exception

            body = self.simulate_request('/api/v0/host/10.9.9.9/creds')
            self.assertEqual(self.srmock.status, falcon.HTTP_404)
            self.assertEqual({}, json.loads(body[0]))
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def test_network_delete(self):
        """
        Verify deleting a network.
        """
        with mock.patch('cherrypy.engine.publish') as _publish:
            manager = mock.MagicMock(StoreHandlerManager)
            _publish.return_value = [manager]

            # Verify with proper deletion
            manager.get.return_value = MagicMock()
            body = self.simulate_request(
                '/api/v0/network/development', method='DELETE')
            # Get is called to verify network exists
            self.assertEquals(falcon.HTTP_200, self.srmock.status)
            self.assertEquals('{}', body[0])

            # Verify when key doesn't exist
            manager.delete.side_effect = etcd.EtcdKeyNotFound
            body = self.simulate_request(
                '/api/v0/network/development', method='DELETE')
            self.assertEquals(falcon.HTTP_404, self.srmock.status)
            self.assertEquals('{}', body[0])
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def on_get(self, req, resp, name):
        """
        Handles GET requests for Cluster hosts.

        :param req: Request instance that will be passed through.
        :type req: falcon.Request
        :param resp: Response instance that will be passed through.
        :type resp: falcon.Response
        :param name: The name of the Cluster being requested.
        :type name: str
        """
        try:
            store_manager = cherrypy.engine.publish('get-store-manager')[0]
            cluster = store_manager.get(Cluster.new(name=name))
        except:
            resp.status = falcon.HTTP_404
            return

        resp.body = json.dumps(cluster.hostset)
        resp.status = falcon.HTTP_200
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def on_get(self, req, resp):
        """
        Handles GET requests for Hosts.

        :param req: Request instance that will be passed through.
        :type req: falcon.Request
        :param resp: Response instance that will be passed through.
        :type resp: falcon.Response
        """

        try:
            store_manager = cherrypy.engine.publish('get-store-manager')[0]
            hosts = store_manager.list(Hosts(hosts=[]))
            if len(hosts.hosts) == 0:
                raise Exception()
            resp.status = falcon.HTTP_200
            req.context['model'] = hosts
        except Exception:
            # This was originally a "no content" but I think a 404 makes
            # more sense if there are no hosts
            self.logger.warn(
                'Store does not have any hosts. Returning [] and 404.')
            resp.status = falcon.HTTP_404
            req.context['model'] = None
            return
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def on_get(self, req, resp):
        """
        Handles GET requests for Networks.

        :param req: Request instance that will be passed through.
        :type req: falcon.Request
        :param resp: Response instance that will be passed through.
        :type resp: falcon.Response
        """

        try:
            store_manager = cherrypy.engine.publish('get-store-manager')[0]
            networks = store_manager.list(Networks(networks=[]))
            if len(networks.networks) == 0:
                raise Exception()
            resp.status = falcon.HTTP_200
            resp.body = json.dumps([
                network.name for network in networks.networks])
        except Exception:
            self.logger.warn(
                'Store does not have any networks. Returning [] and 404.')
            resp.status = falcon.HTTP_404
            req.context['model'] = None
            return
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def on_get(self, req, resp, name):
        """
        Handles retrieval of an existing Network.

        :param req: Request instance that will be passed through.
        :type req: falcon.Request
        :param resp: Response instance that will be passed through.
        :type resp: falcon.Response
        :param name: The friendly name of the network.
        :type address: str
        """
        try:
            store_manager = cherrypy.engine.publish('get-store-manager')[0]
            network = store_manager.get(Network.new(name=name))
            resp.status = falcon.HTTP_200
            req.context['model'] = network
        except:
            resp.status = falcon.HTTP_404
            return
项目:CAL    作者:HPCC-Cloud-Computing    | 项目源码 | 文件源码
def test_second_hook_raise_HTTPNotFound(self):
        bad_headers = {
            'Content-Type': 'application/json',
            'URL-METHODS': 'GET, PUT',
        }

        result = self.simulate_post(headers=bad_headers, body=self.body)
        self.assertEqual(falcon.HTTP_404, result.status)
项目:CAL    作者:HPCC-Cloud-Computing    | 项目源码 | 文件源码
def test_wrong_router(self):
        result = self.simulate_get(path='/some/wrong/path', body=self.body)
        self.assertEqual(falcon.HTTP_404, result.status)
项目:deb-python-falcon    作者:openstack    | 项目源码 | 文件源码
def test_skip_process_resource(self):
        global context
        app = falcon.API(middleware=[RequestTimeMiddleware()])

        app.add_route('/', MiddlewareClassResource())
        client = testing.TestClient(app)

        response = client.simulate_request(path='/404')
        assert response.status == falcon.HTTP_404
        assert 'start_time' in context
        assert 'mid_time' not in context
        assert 'end_time' in context
        assert not context['req_succeeded']
项目:deb-python-falcon    作者:openstack    | 项目源码 | 文件源码
def test_404_without_body(self, client):
        client.app.add_route('/404', NotFoundResource())
        response = client.simulate_request(path='/404')

        assert response.status == falcon.HTTP_404
        assert not response.content
项目:deb-python-falcon    作者:openstack    | 项目源码 | 文件源码
def test_404_with_body(self, client):
        client.app.add_route('/404', NotFoundResourceWithBody())

        response = client.simulate_request(path='/404')
        assert response.status == falcon.HTTP_404
        assert response.content
        expected_body = {
            u'title': u'404 Not Found',
            u'description': u'Not Found'
        }
        assert response.json == expected_body
项目:drydock    作者:att-comdev    | 项目源码 | 文件源码
def on_get(self, req, resp, design_id, kind, name):
        source = req.params.get('source', 'designed')

        try:
            design = None
            if source == 'compiled':
                design = self.orchestrator.get_effective_site(design_id)
            elif source == 'designed':
                design = self.orchestrator.get_described_site(design_id)

            part = None
            if kind == 'Site':
                part = design.get_site()
            elif kind == 'Network':
                part = design.get_network(name)
            elif kind == 'NetworkLink':
                part = design.get_network_link(name)
            elif kind == 'HardwareProfile':
                part = design.get_hardware_profile(name)
            elif kind == 'HostProfile':
                part = design.get_host_profile(name)
            elif kind == 'BaremetalNode':
                part = design.get_baremetal_node(name)
            else:
                self.error(req.context, "Kind %s unknown" % kind)
                self.return_error(
                    resp,
                    falcon.HTTP_404,
                    message="Kind %s unknown" % kind,
                    retry=False)
                return

            resp.body = json.dumps(part.obj_to_simple())
        except errors.DesignError as dex:
            self.error(req.context, str(dex))
            self.return_error(
                resp, falcon.HTTP_404, message=str(dex), retry=False)
        except Exception as exc:
            self.error(req.context, str(exc))
            self.return_error(
                resp.falcon.HTTP_500, message=str(exc), retry=False)
项目:metricsandstuff    作者:bucknerns    | 项目源码 | 文件源码
def not_found(message="The requested resource does not exist"):
        raise falcon.HTTPNotFound(description=message, code=falcon.HTTP_404)
项目:plex-watched-sync    作者:fiLLLip    | 项目源码 | 文件源码
def on_get(self, req, resp, account_id):
        """Handles GET requests"""
        watched = Actions.get_watched(account_id)
        if watched is None:
            resp.status = falcon.HTTP_404
            return
        servers = Actions.get_servers(account_id)
        resp.status = falcon.HTTP_200  # This is the default status
        json_resp = {'account_id': account_id, 'watched': watched, 'servers': servers}
        resp.body = json.dumps(json_resp)
项目:plex-watched-sync    作者:fiLLLip    | 项目源码 | 文件源码
def on_get(self, req, resp, server_id):
        """Handles GET requests"""
        accounts = Actions.get_accounts(server_id)
        if accounts is None:
            resp.status = falcon.HTTP_404
            return
        json_response = {'server_id': server_id, 'accounts': accounts}
        resp.status = falcon.HTTP_200  # This is the default status
        resp.body = json.dumps(json_response)
项目:plex-watched-sync    作者:fiLLLip    | 项目源码 | 文件源码
def on_put(self, req, resp, server_id):
        """Handles PUT requests"""
        resp.status = falcon.HTTP_404 # Disabled!
        return

        try:
            raw_json = req.stream.read()
        except Exception as ex:
            raise falcon.HTTPError(falcon.HTTP_400,
                                   'Error',
                                   ex.message)

        try:
            result_json = json.loads(raw_json, encoding='utf-8')
        except ValueError:
            raise falcon.HTTPError(falcon.HTTP_400,
                                   'Malformed JSON',
                                   'Could not decode the request body. The '
                                   'JSON was incorrect.')

        accounts = Actions.get_accounts(server_id)
        if accounts is None:
            resp.status = falcon.HTTP_404
            return

        accounts = result_json['accounts']
        for account_id in accounts:
            Actions.add_account(server_id, account_id)
            account.Actions.add_server(account_id, server_id)

        resp.status = falcon.HTTP_200  # This is the default status
        jsonresp = {'server_id': server_id, 'accounts': Actions.get_accounts(server_id)}
        resp.body = json.dumps(jsonresp)
项目:djinn    作者:ordjinnization    | 项目源码 | 文件源码
def on_get(self, req, resp, project=None):
        if project:
            if not self.db.check_project_exists(project):
                resp.body = json.dumps({'Error': 'Project key {} not found!'.format(project)})
                resp.status = falcon.HTTP_404
                return

        if project:
            repos = self.db.get_repos_for_project(project)
            body = {'repositories': repos}
        else:
            projects = self.db.get_projects()
            body = {'projects': projects}
        resp.body = json.dumps(body)
        resp.status = falcon.HTTP_200
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def test_clusters_listing_with_no_etcd_result(self):
        """
        Verify listing Clusters handles no etcd result properly.
        """
        with mock.patch('cherrypy.engine.publish') as _publish:
            _publish.return_value = [[[], etcd.EtcdKeyNotFound()]]

            body = self.simulate_request('/api/v0/clusters')
            self.assertEqual(self.srmock.status, falcon.HTTP_404)
            self.assertEqual('{}', body[0])
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def test_cluster_retrieve(self):
        """
        Verify retrieving a cluster.
        """
        with mock.patch('cherrypy.engine.publish') as _publish:
            manager = mock.MagicMock(StoreHandlerManager)
            _publish.return_value = [manager]

            test_cluster = make_new(CLUSTER_WITH_HOST)
            # Verify if the cluster exists the data is returned
            manager.get.return_value = test_cluster
            manager.list.return_value = make_new(HOSTS)

            body = self.simulate_request('/api/v0/cluster/development')
            self.assertEqual(self.srmock.status, falcon.HTTP_200)

            self.assertEqual(
                json.loads(test_cluster.to_json_with_hosts()),
                json.loads(body[0]))

            # Verify no cluster returns the proper result
            manager.get.reset_mock()
            manager.get.side_effect = Exception

            body = self.simulate_request('/api/v0/cluster/bogus')
            self.assertEqual(falcon.HTTP_404, self.srmock.status)
            self.assertEqual({}, json.loads(body[0]))
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def test_hosts_listing_with_no_hosts(self):
        """
        Verify listing Hosts when no hosts exists.
        """
        with mock.patch('cherrypy.engine.publish') as _publish:
            _publish.return_value = Hosts(hosts=[])
            body = self.simulate_request('/api/v0/hosts')
            # datasource's get should have been called once
            self.assertEqual(self.srmock.status, falcon.HTTP_404)
            self.assertEqual({}, json.loads(body[0]))
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def test_hosts_listing_with_no_etcd_result(self):
        """
        Verify listing hosts handles no etcd result properly.
        """
        with mock.patch('cherrypy.engine.publish') as _publish:
            _publish.return_value = [[[], Exception]]
            body = self.simulate_request('/api/v0/hosts')
            # datasource's get should have been called once
            self.assertEqual(self.srmock.status, falcon.HTTP_404)
            self.assertEqual('{}', body[0])
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def test_host_delete(self):
        """
        Verify deleting a Host.
        """
        with mock.patch('cherrypy.engine.publish') as _publish:
            manager = mock.MagicMock(StoreHandlerManager)
            _publish.return_value = [manager]

            # Verify if the host exists the data is returned
            manager.get.return_value = make_new(HOST)

            # Verify deleting of an existing host works
            body = self.simulate_request(
                '/api/v0/host/10.2.0.2', method='DELETE')
            # datasource's delete should have been called once
            self.assertEquals(1, manager.delete.call_count)
            self.assertEqual(self.srmock.status, falcon.HTTP_200)
            self.assertEqual({}, json.loads(body[0]))

            # Verify deleting of a non existing host returns the proper result
            manager.reset_mock()
            manager.delete.side_effect = Exception
            body = self.simulate_request(
                '/api/v0/host/10.9.9.9', method='DELETE')
            self.assertEquals(1, manager.delete.call_count)
            self.assertEqual(self.srmock.status, falcon.HTTP_404)
            self.assertEqual({}, json.loads(body[0]))
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def test_networks_listing_with_no_networks(self):
        """
        Verify listing Networks when no networks exist.
        """
        with mock.patch('cherrypy.engine.publish') as _publish:
            return_value = networks.Networks(networks=[])
            manager = mock.MagicMock(StoreHandlerManager)
            manager.list.return_value = return_value
            _publish.return_value = [manager]

            body = self.simulate_request('/api/v0/networks')
            self.assertEqual(self.srmock.status, falcon.HTTP_404)
            self.assertEqual({}, json.loads(body[0]))
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def test_networks_listing_with_no_etcd_result(self):
        """
        Verify listing Networks handles no etcd result properly.
        """
        with mock.patch('cherrypy.engine.publish') as _publish:
            _publish.return_value = [[[], etcd.EtcdKeyNotFound()]]

            body = self.simulate_request('/api/v0/networks')
            self.assertEqual(self.srmock.status, falcon.HTTP_404)
            self.assertEqual('{}', body[0])
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def test_network_retrieve(self):
        """
        Verify retrieving a network.
        """
        with mock.patch('cherrypy.engine.publish') as _publish:
            manager = mock.MagicMock(StoreHandlerManager)
            _publish.return_value = [manager]

            test_network = networks.Network.new(name='default')
            # Verify if the network exists the data is returned
            manager.get.return_value = test_network

            body = self.simulate_request('/api/v0/network/default')
            self.assertEqual(self.srmock.status, falcon.HTTP_200)

            self.assertEqual(
                json.loads(test_network.to_json()),
                json.loads(body[0]))

            # Verify no network returns the proper result
            manager.get.reset_mock()
            manager.get.side_effect = Exception

            body = self.simulate_request('/api/v0/network/bogus')
            self.assertEqual(falcon.HTTP_404, self.srmock.status)
            self.assertEqual({}, json.loads(body[0]))
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def on_get(self, req, resp, name):
        """
        Handles retrieval of an existing Cluster.

        :param req: Request instance that will be passed through.
        :type req: falcon.Request
        :param resp: Response instance that will be passed through.
        :type resp: falcon.Response
        :param name: The name of the Cluster being requested.
        :type name: str
        """
        try:
            store_manager = cherrypy.engine.publish('get-store-manager')[0]
            cluster = store_manager.get(Cluster.new(name=name))
        except Exception as error:
            self.logger.error("{0}: {1}".format(type(error), error))
            resp.status = falcon.HTTP_404
            return

        if not cluster:
            resp.status = falcon.HTTP_404
            return

        self._calculate_hosts(cluster)
        # Have to set resp.body explicitly to include Hosts.
        resp.body = cluster.to_json_with_hosts()
        resp.status = falcon.HTTP_200
        self.logger.debug('Cluster retrieval: {0}'.format(resp.body))
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def on_get(self, req, resp, name, address):
        """
        Handles GET requests for individual hosts in a Cluster.
        This is a membership test, returning 200 OK if the host
        address is part of the cluster, or else 404 Not Found.

        :param req: Request instance that will be passed through.
        :type req: falcon.Request
        :param resp: Response instance that will be passed through.
        :type resp: falcon.Response
        :param name: The name of the Cluster being requested.
        :type name: str
        :param address: The address of the Host being requested.
        :type address: str
        """
        try:
            store_manager = cherrypy.engine.publish('get-store-manager')[0]
            cluster = store_manager.get(Cluster.new(name=name))
        except:
            resp.status = falcon.HTTP_404
            return

        if address in cluster.hostset:
            resp.status = falcon.HTTP_200
        else:
            resp.status = falcon.HTTP_404
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def on_get(self, req, resp, name):
        """
        Handles GET (or "status") requests for a tree image deployment
        across a Cluster.

        :param req: Request instance that will be passed through.
        :type req: falcon.Request
        :param resp: Response instance that will be passed through.
        :type resp: falcon.Response
        :param name: The name of the Cluster undergoing deployment.
        :type name: str
        """
        if not util.etcd_cluster_exists(name):
            self.logger.info(
                'Deploy GET requested for nonexistent cluster {0}'.format(
                    name))
            resp.status = falcon.HTTP_404
            return

        try:
            store_manager = cherrypy.engine.publish('get-store-manager')[0]
            cluster_deploy = store_manager.get(ClusterDeploy.new(name=name))
            self.logger.debug('Found ClusterDeploy for {0}'.format(name))
        except:
            # Return "204 No Content" if we have no status,
            # meaning no deployment is in progress.  The client
            # can't be expected to know that, so it's not a
            # client error (4xx).
            self.logger.debug((
                'Deploy GET requested for {0} but no deployment '
                'has ever been executed.').format(name))

            resp.status = falcon.HTTP_204
            return

        resp.status = falcon.HTTP_200
        req.context['model'] = cluster_deploy
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def on_get(self, req, resp, name):
        """
        Handles GET (or "status") requests for a Cluster upgrade.

        :param req: Request instance that will be passed through.
        :type req: falcon.Request
        :param resp: Response instance that will be passed through.
        :type resp: falcon.Response
        :param name: The name of the Cluster being upgraded.
        :type name: str
        """
        if not util.etcd_cluster_exists(name):
            self.logger.info(
                'Upgrade GET requested for nonexistent cluster {0}'.format(
                    name))
            resp.status = falcon.HTTP_404
            return

        try:
            store_manager = cherrypy.engine.publish('get-store-manager')[0]
            cluster_upgrade = store_manager.get(ClusterUpgrade.new(name=name))
            self.logger.debug('Found ClusterUpgrade for {0}'.format(name))
        except:
            # Return "204 No Content" if we have no status,
            # meaning no upgrade is in progress.  The client
            # can't be expected to know that, so it's not a
            # client error (4xx).
            self.logger.debug((
                'Upgrade GET requested for {0} but no upgrade '
                'has ever been executed.').format(name))

            resp.status = falcon.HTTP_204
            return

        resp.status = falcon.HTTP_200
        req.context['model'] = cluster_upgrade
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def on_get(self, req, resp, address):
        """
        Handles retrieval of existing Host credentials.

        :param req: Request instance that will be passed through.
        :type req: falcon.Request
        :param resp: Response instance that will be passed through.
        :type resp: falcon.Response
        :param address: The address of the Host being requested.
        :type address: str
        """
        # TODO: Verify input
        # TODO: Decide if this should be a model or if it makes sense to
        #       stay a subset off of Host and bypass the req.context
        #       middleware system.
        try:
            store_manager = cherrypy.engine.publish('get-store-manager')[0]
            host = store_manager.get(Host.new(address=address))
            resp.status = falcon.HTTP_200
            body = {
                'ssh_priv_key': host.ssh_priv_key,
                'remote_user': host.remote_user or 'root',
            }
            resp.body = json.dumps(body)
        except:
            resp.status = falcon.HTTP_404
            return
项目:ops    作者:xiaomatech    | 项目源码 | 文件源码
def sink(req, resp):
    do_log_history(req, resp)

    paths = filter(lambda x: x != '', req.path.split('/'))
    ctrl_name = func_name = 'index'
    if len(paths) >= 2:
        ctrl_name = paths[0]
        func_name = paths[1]
    elif len(paths) == 1:
        func_name = paths[0]
    ctrl = loader.ctrl(ctrl_name)

    if ctrl == None or not hasattr(ctrl, func_name):
        resp.status = falcon.HTTP_404
        resp.body = "Not Found"
    else:
        try:
            content = getattr(ctrl, func_name)(req, resp)
            if resp.body == None:
                if isinstance(content, unicode):
                    resp.body = unicode.encode(content, 'utf-8', 'ignore')
                elif isinstance(content, str):
                    resp.body = content
                else:
                    resp.body = json.dumps(content)
        except Exception as ex:
            log_error(ex)
            resp.status = falcon.HTTP_500
            resp.body = str(ex)
            #resp.body = 'A server error occurred. Please contact the administrator'
    do_log_result(req, resp)
项目:deb-python-falcon    作者:openstack    | 项目源码 | 文件源码
def test_custom_router_find_should_be_used():

    def resource(req, resp, **kwargs):
        resp.body = '{{"uri_template": "{0}"}}'.format(req.uri_template)

    class CustomRouter(object):
        def __init__(self):
            self.reached_backwards_compat = False

        def find(self, uri):
            if uri == '/test/42':
                return resource, {'GET': resource}, {}, '/test/{id}'

            if uri == '/test/42/no-uri-template':
                return resource, {'GET': resource}, {}, None

            if uri == '/test/42/uri-template/backwards-compat':
                return resource, {'GET': resource}, {}

            if uri == '/404/backwards-compat':
                self.reached_backwards_compat = True
                return (None, None, None)

            return None

    router = CustomRouter()
    app = falcon.API(router=router)
    client = testing.TestClient(app)

    response = client.simulate_request(path='/test/42')
    assert response.content == b'{"uri_template": "/test/{id}"}'

    response = client.simulate_request(path='/test/42/no-uri-template')
    assert response.content == b'{"uri_template": "None"}'

    response = client.simulate_request(path='/test/42/uri-template/backwards-compat')
    assert response.content == b'{"uri_template": "None"}'

    for uri in ('/404', '/404/backwards-compat'):
        response = client.simulate_request(path=uri)
        assert not response.content
        assert response.status == falcon.HTTP_404

    assert router.reached_backwards_compat
项目:drydock    作者:att-comdev    | 项目源码 | 文件源码
def on_get(self, req, resp, design_id):
        try:
            design = self.state_manager.get_design(design_id)
        except errors.DesignError:
            self.return_error(
                resp,
                falcon.HTTP_404,
                message="Design %s nout found" % design_id,
                retry=False)

        part_catalog = []

        site = design.get_site()

        part_catalog.append({'kind': 'Region', 'key': site.get_id()})

        part_catalog.extend([{
            'kind': 'Network',
            'key': n.get_id()
        } for n in design.networks])

        part_catalog.extend([{
            'kind': 'NetworkLink',
            'key': l.get_id()
        } for l in design.network_links])

        part_catalog.extend([{
            'kind': 'HostProfile',
            'key': p.get_id()
        } for p in design.host_profiles])

        part_catalog.extend([{
            'kind': 'HardwareProfile',
            'key': p.get_id()
        } for p in design.hardware_profiles])

        part_catalog.extend([{
            'kind': 'BaremetalNode',
            'key': n.get_id()
        } for n in design.baremetal_nodes])

        resp.body = json.dumps(part_catalog)
        resp.status = falcon.HTTP_200
        return
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def test_cluster_hosts_overwrite(self):
        """
        Verify overwriting a cluster host list.
        """
        with mock.patch('cherrypy.engine.publish') as _publish:
            manager = mock.MagicMock(StoreHandlerManager)
            _publish.return_value = [manager]

            # Verify setting host list works with a proper request
            manager.get.return_value = make_new(CLUSTER_WITH_FLAT_HOST)
            body = self.simulate_request(
                '/api/v0/cluster/development/hosts', method='PUT',
                body='{"old": ["10.2.0.2"], "new": ["10.2.0.2", "10.2.0.3"]}')
            self.assertEqual(falcon.HTTP_200, self.srmock.status)
            self.assertEqual({}, json.loads(body[0]))

            # Verify bad request (KeyError) returns the proper result
            manager.get.side_effect = KeyError
            body = self.simulate_request(
                '/api/v0/cluster/development/hosts', method='PUT',
                body='{"new": ["10.2.0.2", "10.2.0.3"]}')
            self.assertEqual(falcon.HTTP_400, self.srmock.status)
            self.assertEqual({}, json.loads(body[0]))

            # Verify bad request (TypeError) returns the proper result
            manager.get.side_effect = TypeError
            body = self.simulate_request(
                '/api/v0/cluster/development/hosts', method='PUT',
                body='["10.2.0.2", "10.2.0.3"]')
            self.assertEqual(falcon.HTTP_400, self.srmock.status)
            self.assertEqual({}, json.loads(body[0]))

            # Verify bad cluster name returns the proper result
            manager.get.side_effect = Exception
            body = self.simulate_request(
                '/api/v0/cluster/bogus/hosts', method='PUT',
                body='{"old": ["10.2.0.2"], "new": ["10.2.0.2", "10.2.0.3"]}')
            self.assertEqual(falcon.HTTP_404, self.srmock.status)
            self.assertEqual({}, json.loads(body[0]))

            # Verify host list conflict returns the proper result
            manager.get.side_effect = None
            body = self.simulate_request(
                '/api/v0/cluster/development/hosts', method='PUT',
                body='{"old": [], "new": ["10.2.0.2", "10.2.0.3"]}')
            self.assertEqual(falcon.HTTP_409, self.srmock.status)
            self.assertEqual({}, json.loads(body[0]))
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def on_put(self, req, resp, name):
        """
        Handles PUT requests for Cluster hosts.
        This replaces the entire host list for a Cluster.

        :param req: Request instance that will be passed through.
        :type req: falcon.Request
        :param resp: Response instance that will be passed through.
        :type resp: falcon.Response
        :param name: The name of the Cluster being requested.
        :type name: str
        """
        try:
            req_body = json.loads(req.stream.read().decode())
            old_hosts = set(req_body['old'])  # Ensures no duplicates
            new_hosts = set(req_body['new'])  # Ensures no duplicates
        except (KeyError, TypeError):
            self.logger.info(
                'Bad client PUT request for cluster "{0}": {1}'.
                format(name, req_body))
            resp.status = falcon.HTTP_400
            return

        try:
            store_manager = cherrypy.engine.publish('get-store-manager')[0]
            cluster = store_manager.get(Cluster.new(name=name))
        except:
            resp.status = falcon.HTTP_404
            return

        # old_hosts must match current hosts to accept new_hosts.
        if old_hosts != set(cluster.hostset):
            self.logger.info(
                'Conflict setting hosts for cluster {0}'.format(name))
            self.logger.debug('{0} != {1}'.format(old_hosts, cluster.hostset))
            resp.status = falcon.HTTP_409
            return

        # FIXME: Need input validation.  For each new host,
        #        - Does the host exist at /commissaire/hosts/{IP}?
        #        - Does the host already belong to another cluster?

        # FIXME: Should guard against races here, since we're fetching
        #        the cluster record and writing it back with some parts
        #        unmodified.  Use either locking or a conditional write
        #        with the etcd 'modifiedIndex'.  Deferring for now.

        cluster.hostset = list(new_hosts)
        store_manager.save(cluster)
        resp.status = falcon.HTTP_200
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def on_put(self, req, resp, name):
        """
        Handles PUT (or "initiate") requests for a Cluster restart.

        :param req: Request instance that will be passed through.
        :type req: falcon.Request
        :param resp: Response instance that will be passed through.
        :type resp: falcon.Response
        :param name: The name of the Cluster being restarted.
        :type name: str
        """
        # Make sure the cluster name is valid.
        if not util.etcd_cluster_exists(name):
            self.logger.info(
                'Restart PUT requested for nonexistent cluster {0}'.format(
                    name))
            resp.status = falcon.HTTP_404
            return

        # If the operation is already in progress, return the current
        # status with response code 200 OK.
        try:
            store_manager = cherrypy.engine.publish('get-store-manager')[0]
            cluster_restart = store_manager.get(ClusterRestart.new(name=name))
            self.logger.debug('Found a ClusterRestart for {0}'.format(name))
            if not cluster_restart.finished_at:
                self.logger.debug(
                    'Cluster {0} restart already in progress'.format(name))
                resp.status = falcon.HTTP_200
                req.context['model'] = cluster_restart
                return
        except:
            # This means one doesn't already exist
            pass

        # TODO: Move to a poll?
        store_manager = cherrypy.engine.publish('get-store-manager')[0]
        args = (store_manager.clone(), name, 'restart')
        p = Process(target=clusterexec, args=args)
        p.start()

        self.logger.debug('Started restart in clusterexecpool for {0}'.format(
            name))

        cluster_restart = ClusterRestart.new(
            name=name,
            status='in_process',
            started_at=datetime.datetime.utcnow().isoformat()
        )
        store_manager = cherrypy.engine.publish('get-store-manager')[0]
        store_manager.save(cluster_restart)
        resp.status = falcon.HTTP_201
        req.context['model'] = cluster_restart
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def on_put(self, req, resp, name):
        """
        Handles PUT (or "initiate") requests for a Cluster upgrade.

        :param req: Request instance that will be passed through.
        :type req: falcon.Request
        :param resp: Response instance that will be passed through.
        :type resp: falcon.Response
        :param name: The name of the Cluster being upgraded.
        :type name: str
        """
        # Make sure the cluster name is valid.
        if not util.etcd_cluster_exists(name):
            self.logger.info(
                'Upgrade PUT requested for nonexistent cluster {0}'.format(
                    name))
            resp.status = falcon.HTTP_404
            return

        # If the operation is already in progress, return the current
        # status with response code 200 OK.
        try:
            store_manager = cherrypy.engine.publish('get-store-manager')[0]
            cluster_upgrade = store_manager.get(ClusterUpgrade.new(name=name))
            self.logger.debug('Found ClusterUpgrade for {0}'.format(name))
            if not cluster_upgrade.finished_at:
                self.logger.debug(
                    'Cluster {0} upgrade already in progress'.format(name))
                resp.status = falcon.HTTP_200
                req.context['model'] = cluster_upgrade
                return
        except:
            # This means one doesn't already exist.
            pass

        # TODO: Move to a poll?
        store_manager = cherrypy.engine.publish('get-store-manager')[0]
        args = (store_manager.clone(), name, 'upgrade')
        p = Process(target=clusterexec, args=args)
        p.start()

        self.logger.debug('Started upgrade in clusterexecpool for {0}'.format(
            name))
        cluster_upgrade = ClusterUpgrade.new(
            name=name,
            status='in_process',
            started_at=datetime.datetime.utcnow().isoformat()
        )

        store_manager = cherrypy.engine.publish('get-store-manager')[0]
        store_manager.save(cluster_upgrade)
        resp.status = falcon.HTTP_201
        req.context['model'] = cluster_upgrade
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def on_delete(self, req, resp, address):
        """
        Handles the Deletion of a Host.

        :param req: Request instance that will be passed through.
        :type req: falcon.Request
        :param resp: Response instance that will be passed through.
        :type resp: falcon.Response
        :param address: The address of the Host being requested.
        :type address: str
        """
        resp.body = '{}'
        store_manager = cherrypy.engine.publish('get-store-manager')[0]
        try:
            host = Host.new(address=address)
            WATCHER_QUEUE.dequeue(host)
            store_manager.delete(host)
            self.logger.debug(
                'Deleted host {0} and dequeued it from the watcher.'.format(
                    host.address))
            resp.status = falcon.HTTP_200
        except:
            resp.status = falcon.HTTP_404

        # Also remove the host from all clusters.
        # Note: We've done all we need to for the host deletion,
        #       so if an error occurs from here just log it and
        #       return.
        try:
            clusters = store_manager.list(Clusters(clusters=[]))
        except:
            self.logger.warn('Store does not have any clusters')
            return
        for cluster in clusters.clusters:
            try:
                self.logger.debug(
                    'Checking cluster {0}'.format(cluster.name))
                if address in cluster.hostset:
                    self.logger.info(
                        'Removing {0} from cluster {1}'.format(
                            address, cluster.name))
                    cluster.hostset.remove(address)
                    store_manager.save(cluster)
                    self.logger.info(
                        '{0} has been removed from cluster {1}'.format(
                            address, cluster.name))
            except:
                self.logger.warn(
                    'Failed to remove {0} from cluster {1}'.format(
                        address, cluster.name))