Python falcon 模块,HTTP_200 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用falcon.HTTP_200

项目:gyr    作者:non-Jedi    | 项目源码 | 文件源码
def on_put(self, request, response, txn_id=None):
        """Responds to PUT request containing events."""
        response.body = "{}"

        # Check whether repeat txn_id
        if not self._is_new(txn_id):
            response.status = falcon.HTTP_200
            return

        request.context["body"] = request.stream.read()
        try:
            events = json.loads(request.context["body"].decode("utf-8"))["events"]
        except(KeyError, ValueError, UnicodeDecodeError):
            response.status = falcon.HTTP_400
            response.body = "Malformed request body"
            return

        if self.handler(EventStream(events, self.Api)):
            response.status = falcon.HTTP_200
        else:
            response.status = falcon.HTTP_400
项目:antenna    作者:mozilla-services    | 项目源码 | 文件源码
def on_get(self, req, resp):
        mymetrics.incr('heartbeat.count')
        state = HealthState()

        # So we're going to think of Antenna like a big object graph and
        # traverse passing along the HealthState instance. Then, after
        # traversing the object graph, we'll tally everything up and deliver
        # the news.
        for resource in self.antenna_app.get_resources():
            if hasattr(resource, 'check_health'):
                resource.check_health(state)

        # Go through and call gauge for each statsd item.
        for k, v in state.statsd.items():
            mymetrics.gauge(k, value=v)

        if state.is_healthy():
            resp.status = falcon.HTTP_200
        else:
            resp.status = falcon.HTTP_503
        resp.body = json.dumps(state.to_dict())
项目:spacy-services    作者:explosion    | 项目源码 | 文件源码
def on_get(self, req, resp, model_name):
        try:
            model = get_model(model_name)
            output = {
                'dep_types': get_dep_types(model),
                'ent_types': get_ent_types(model),
                'pos_types': get_pos_types(model)
            }

            resp.body = json.dumps(output, sort_keys=True, indent=2)
            resp.content_type = 'text/string'
            resp.append_header('Access-Control-Allow-Origin', "*")
            resp.status = falcon.HTTP_200
        except Exception as e:
            raise falcon.HTTPBadRequest(
                'Schema construction failed',
                '{}'.format(e))
            resp.status = falcon.HTTP_500
项目:spacy-services    作者:explosion    | 项目源码 | 文件源码
def on_post(self, req, resp):
        req_body = req.stream.read()
        json_data = json.loads(req_body.decode('utf8'))
        text = json_data.get('text')
        model_name = json_data.get('model', 'en')
        collapse_punctuation = json_data.get('collapse_punctuation', True)
        collapse_phrases = json_data.get('collapse_phrases', True)

        try:
            model = get_model(model_name)
            parse = Parse(model, text, collapse_punctuation, collapse_phrases)
            resp.body = json.dumps(parse.to_json(), sort_keys=True, indent=2)
            resp.content_type = 'text/string'
            resp.append_header('Access-Control-Allow-Origin', "*")
            resp.status = falcon.HTTP_200
        except Exception as e:
            raise falcon.HTTPBadRequest(
                'Dependency parsing failed',
                '{}'.format(e))
            resp.status = falcon.HTTP_500
项目:armada    作者:att-comdev    | 项目源码 | 文件源码
def on_post(self, req, resp):
        try:
            manifest = self.req_yaml(req)
            documents = list(manifest)

            message = {
                'valid': validate_armada_documents(documents)
            }

            resp.status = falcon.HTTP_200
            resp.body = json.dumps(message)
            resp.content_type = 'application/json'

        except Exception:
            err_message = 'Failed to validate Armada Manifest'
            self.error(req.context, err_message)
            self.return_error(
                resp, falcon.HTTP_400, message=err_message)
项目:deb-python-falcon    作者:openstack    | 项目源码 | 文件源码
def test_raise_status_in_process_request(self):
        """ Make sure we can raise status from middleware process request """
        class TestMiddleware:
            def process_request(self, req, resp):
                raise HTTPStatus(falcon.HTTP_200,
                                 headers={'X-Failed': 'False'},
                                 body='Pass')

        app = falcon.API(middleware=TestMiddleware())
        app.add_route('/status', TestHookResource())
        client = testing.TestClient(app)

        response = client.simulate_request(path='/status', method='GET')
        assert response.status == falcon.HTTP_200
        assert response.headers['x-failed'] == 'False'
        assert response.text == 'Pass'
项目:deb-python-falcon    作者:openstack    | 项目源码 | 文件源码
def test_raise_status_in_process_resource(self):
        """ Make sure we can raise status from middleware process resource """
        class TestMiddleware:
            def process_resource(self, req, resp, resource, params):
                raise HTTPStatus(falcon.HTTP_200,
                                 headers={'X-Failed': 'False'},
                                 body='Pass')

        app = falcon.API(middleware=TestMiddleware())
        app.add_route('/status', TestHookResource())
        client = testing.TestClient(app)

        response = client.simulate_request(path='/status', method='GET')
        assert response.status == falcon.HTTP_200
        assert response.headers['x-failed'] == 'False'
        assert response.text == 'Pass'
项目:deb-python-falcon    作者:openstack    | 项目源码 | 文件源码
def test_raise_status_runs_process_response(self):
        """ Make sure process_response still runs """
        class TestMiddleware:
            def process_response(self, req, resp, response):
                resp.status = falcon.HTTP_200
                resp.set_header('X-Failed', 'False')
                resp.body = 'Pass'

        app = falcon.API(middleware=TestMiddleware())
        app.add_route('/status', TestHookResource())
        client = testing.TestClient(app)

        response = client.simulate_request(path='/status', method='GET')
        assert response.status == falcon.HTTP_200
        assert response.headers['x-failed'] == 'False'
        assert response.text == 'Pass'
项目:deb-python-falcon    作者:openstack    | 项目源码 | 文件源码
def test_log_get_request(self):
        """Test that Log middleware is executed"""
        global context
        app = falcon.API(middleware=[RequestTimeMiddleware()])

        app.add_route(TEST_ROUTE, MiddlewareClassResource())
        client = testing.TestClient(app)

        response = client.simulate_request(path=TEST_ROUTE)
        assert _EXPECTED_BODY == response.json
        assert response.status == falcon.HTTP_200

        assert 'start_time' in context
        assert 'mid_time' in context
        assert 'end_time' in context
        assert context['mid_time'] >= context['start_time'], \
            'process_resource not executed after request'
        assert context['end_time'] >= context['start_time'], \
            'process_response not executed after request'

        assert context['req_succeeded']
项目:deb-python-falcon    作者:openstack    | 项目源码 | 文件源码
def test_generate_trans_id_and_time_with_request(self):
        global context
        app = falcon.API(middleware=[TransactionIdMiddleware(),
                                     RequestTimeMiddleware()])

        app.add_route(TEST_ROUTE, MiddlewareClassResource())
        client = testing.TestClient(app)

        response = client.simulate_request(path=TEST_ROUTE)
        assert _EXPECTED_BODY == response.json
        assert response.status == falcon.HTTP_200
        assert 'transaction_id' in context
        assert 'unique-req-id' == context['transaction_id']
        assert 'start_time' in context
        assert 'mid_time' in context
        assert 'end_time' in context
        assert context['mid_time'] >= context['start_time'], \
            'process_resource not executed after request'
        assert context['end_time'] >= context['start_time'], \
            'process_response not executed after request'
项目:deb-python-falcon    作者:openstack    | 项目源码 | 文件源码
def test_independent_middleware_execution_order(self):
        global context
        app = falcon.API(independent_middleware=True,
                         middleware=[ExecutedFirstMiddleware(),
                                     ExecutedLastMiddleware()])

        app.add_route(TEST_ROUTE, MiddlewareClassResource())
        client = testing.TestClient(app)

        response = client.simulate_request(path=TEST_ROUTE)
        assert _EXPECTED_BODY == response.json
        assert response.status == falcon.HTTP_200
        # as the method registration is in a list, the order also is
        # tested
        expectedExecutedMethods = [
            'ExecutedFirstMiddleware.process_request',
            'ExecutedLastMiddleware.process_request',
            'ExecutedFirstMiddleware.process_resource',
            'ExecutedLastMiddleware.process_resource',
            'ExecutedLastMiddleware.process_response',
            'ExecutedFirstMiddleware.process_response'
        ]
        assert expectedExecutedMethods == context['executed_methods']
项目:deb-python-falcon    作者:openstack    | 项目源码 | 文件源码
def test_single_trailing_slash(client):
    resource1 = IDResource()
    client.app.add_route('/1/{id}/', resource1)
    result = client.simulate_get('/1/123')
    assert result.status == falcon.HTTP_200
    assert resource1.called
    assert resource1.id == '123'
    assert resource1.req.path == '/1/123'

    resource2 = IDResource()
    client.app.add_route('/2/{id}/', resource2)
    result = client.simulate_get('/2/123/')
    assert result.status == falcon.HTTP_200
    assert resource2.called
    assert resource2.id == '123'
    assert resource2.req.path == '/2/123'

    resource3 = IDResource()
    client.app.add_route('/3/{id}', resource3)
    result = client.simulate_get('/3/123/')
    assert result.status == falcon.HTTP_200
    assert resource3.called
    assert resource3.id == '123'
    assert resource3.req.path == '/3/123'
项目:deb-python-falcon    作者:openstack    | 项目源码 | 文件源码
def on_get(self, req, resp, **kwargs):
        """GET responder.

        Captures `req`, `resp`, and `kwargs`. Also sets up a sample response.

        Args:
            req: Falcon ``Request`` instance.
            resp: Falcon ``Response`` instance.
            kwargs: URI template *name=value* pairs, if any, along with
                any extra args injected by middleware.

        """

        # Don't try this at home - classes aren't recreated
        # for every request
        self.req, self.resp, self.kwargs = req, resp, kwargs

        self.called = True
        resp.status = falcon.HTTP_200
        resp.body = self.sample_body
        resp.set_headers(self.resp_headers)
项目:drydock    作者:att-comdev    | 项目源码 | 文件源码
def on_get(self, req, resp):
        """Method handler for GET requests.

        :param req: Falcon request object
        :param resp: Falcon response object
        """
        state = self.state_manager

        try:
            designs = list(state.designs.keys())

            resp.body = json.dumps(designs)
            resp.status = falcon.HTTP_200
        except Exception as ex:
            self.error(req.context, "Exception raised: %s" % str(ex))
            self.return_error(
                resp,
                falcon.HTTP_500,
                message="Error accessing design list",
                retry=True)
项目:monasca-events-api    作者:openstack    | 项目源码 | 文件源码
def test_should_pass_simple_event(self, bulk_processor):
        events_resource = _init_resource(self)
        events_resource._processor = bulk_processor
        unit_test_patch = os.path.dirname(__file__)
        json_file_path = 'event_template_json/req_simple_event.json'
        patch_to_req_simple_event_file = os.path.join(unit_test_patch,
                                                      json_file_path)
        with open(patch_to_req_simple_event_file, 'r') as fi:
            body = fi.read()
        self.simulate_request(
            path=ENDPOINT,
            method='POST',
            headers={
                'Content-Type': 'application/json',
                'X_ROLES': 'monasca'
            },
            body=body
        )
        self.assertEqual(falcon.HTTP_200, self.srmock.status)
项目:monasca-events-api    作者:openstack    | 项目源码 | 文件源码
def test_should_multiple_events(self, bulk_processor):
        events_resource = _init_resource(self)
        events_resource._processor = bulk_processor
        unit_test_patch = os.path.dirname(__file__)
        json_file_path = 'event_template_json/req_multiple_events.json'
        req_multiple_events_json = os.path.join(unit_test_patch,
                                                json_file_path)
        with open(req_multiple_events_json, 'r') as fi:
            body = fi.read()
        self.simulate_request(
            path=ENDPOINT,
            method='POST',
            headers={
                'Content-Type': 'application/json',
                'X_ROLES': 'monasca'
            },
            body=body
        )
        self.assertEqual(falcon.HTTP_200, self.srmock.status)
项目:spacy-dev-resources    作者:explosion    | 项目源码 | 文件源码
def on_post(self, req, resp):
        req_body = req.stream.read()
        json_data = json.loads(req_body.decode('utf8'))
        paragraphs = json_data.get('paragraphs')
        model_name = json_data.get('model', 'en')
        try:
            model = get_model(model_name)
            entities = []
            for p in paragraphs:
                e = Entities(model, p.get('text'))
                entities.append(e.to_json())
            resp.body = json.dumps(entities, sort_keys=True, indent=2)
            resp.content_type = 'application/json'
            resp.status = falcon.HTTP_200
        except Exception:
            resp.status = falcon.HTTP_500
项目:spacy-dev-resources    作者:explosion    | 项目源码 | 文件源码
def on_post(self, req, resp):
        req_body = req.stream.read()
        json_data = json.loads(req_body.decode('utf8'))
        paragraphs = json_data.get('paragraphs')
        model_name = json_data.get('model', 'en')
        try:
            model = get_model(model_name)
            texts = [paragraph.get('text') for paragraph in paragraphs]
            update_vocabulary(model, texts)
            entities = []
            for p in paragraphs:
                e = TrainEntities(model, p.get('text'), p.get('tags'))
                entities.append(e.to_json())
            resp.body = json.dumps(entities, sort_keys=True, indent=2)
            resp.content_type = 'application/json'
            resp.status = falcon.HTTP_200
        except Exception:
            print("Unexpected error:", sys.exc_info()[0])
            resp.status = falcon.HTTP_500
项目:wechat-bot-skeleton-python    作者:edonosotti    | 项目源码 | 文件源码
def on_get(self, request, response):
        # Get the parameters from the query string
        signature = request.get_param('signature')
        timestamp = request.get_param('timestamp')
        nonce = request.get_param('nonce')
        echostr = request.get_param('echostr')

        # Compute the signature (note that the shared token is used too)
        verification_elements = [self.token, timestamp, nonce]
        verification_elements.sort()
        verification_string = "".join(verification_elements)
        verification_string = hashlib.sha1(verification_string.encode('utf-8')).hexdigest()

        # If the signature is correct, output the same "echostr" provided by the WeChat server as a parameter
        if signature == verification_string:
            response.status = falcon.HTTP_200
            response.body = echostr
        else:
            response.status = falcon.HTTP_500
            response.body = ""

    # Messages will be POSTed from the WeChat server to the chatbot backend server,
    # see: http://admin.wechat.com/wiki/index.php?title=Common_Messages
项目:wechat-bot-skeleton-python    作者:edonosotti    | 项目源码 | 文件源码
def on_post(self, request, response):
        formatter = WeChatMessageFormatter()
        message = formatter.parse_incoming_message(request.bounded_stream.read())

        if message['valid']:
            # Queue the message for delayed processing
            self.db_manager.queue_message(message)
            # WeChat always requires incoming user messages to be acknowledged at
            # least with an empty string (empty strings are not shown to users),
            # see: https://chatbotsmagazine.com/building-chatbots-for-wechat-part-1-dba8f160349
            # In this sample app, we simulate a "Customer Service"-like scenario
            # providing an instant reply to the user, announcing that a complete
            # reply will follow.
            reply = "Thank you for your message. We will get back to you as soon as possible!"
            response.status = falcon.HTTP_200
            response.body = formatter.format_instant_reply(message, reply)
        else:
            response.status = falcon.HTTP_200
            response.body = "Message was sent in a wrong format."
项目:keycloak-python    作者:ibotty    | 项目源码 | 文件源码
def on_get(self, req, resp):
        resp.status = falcon.HTTP_200

        if self.keycloak:
            grant = self.keycloak.get_grant(req, resp)

            if self.keycloak.grant_has_role(grant, "resources-writer"):
                resp.body = "has role! token: %s" % self.keycloak.manager.decode_token(grant.access_token)
            else:
                try:
                    resp.body = "No role! token: %s" % self.keycloak.manager.decode_token(grant.access_token)
                except:
                    resp.body = "No valid bearer token."
        else:
            resp.body = """
            Everything a-ok.
            """
项目:orbital-cotwo-web    作者:chronos-pramantha    | 项目源码 | 文件源码
def on_post(self, req, resp):
        """Handles POST requests"""
        resp.status = falcon.HTTP_200
        # grab 'geojson' from req.context
        data = req.context['geojson']
        # get coordinates from geojson
        coords = spatial.from_list_to_ewkt(
            spatial.coordinates_from_geojson(data)
        )
        print(coords)
        # create the controller view
        controller = Controller(coords)
        # find the areas contained by controller's view and connected points' data
        controller.which_areas_contains_this_polygon()
        # dump the retrieved data
        json = controller.serialize_features_from_database()

        print(str(controller))
        print(json)

        req.context['result'] = json
项目:kapsel    作者:conda    | 项目源码 | 文件源码
def on_get(self, req, resp):
        """Handles GET requests"""
        resp.body = """
<html>
  <head>
    <title>Quote API Server</title>
  </head>
  <body>
    <p>This is a toy JSON API server example.</p>
    <p>Make a GET request to <a href="%s/quote">%s/quote</a></p>
  </body>
</html>
""" % (self.prefix, self.prefix)
        resp.content_type = "text/html"
        resp.status = falcon.HTTP_200


# A Falcon middleware to implement validation of the Host header in requests
项目:promenade    作者:att-comdev    | 项目源码 | 文件源码
def _return_msg(self, resp, status_code, status="Valid", message=""):
        if status_code is falcon.HTTP_200:
            count = 0
            msg_list = []
        else:
            count = 1
            msg_list = [message]
        resp.body = json.dumps({
            "kind": "Status",
            "apiVersion": "v1",
            "metadata": {},
            "status": status,
            "message": message,
            "reason": "Validation",
            "details": {
                "errorCount": count,
                "messageList": msg_list,
            },
            "code": status_code,
        })
项目:Cuppa    作者:flipkart-incubator    | 项目源码 | 文件源码
def test_on_get(self):
        time.sleep(1)
        req = None
        resp = falcon.Response()
        resp.status = None
        resp.body = None
        self.elb_resource.on_get(req, resp)
        response_body = json.loads(resp.body)
        self.assertEquals(response_body['capacity'], 100)
        self.assertEquals(response_body['requests'], 1)
        self.assertGreaterEqual(response_body['uptime'], 1)
        self.assertEquals(resp.status, falcon.HTTP_200)

        self.load_balancer.check_if_model_to_workers_map_is_empty = MagicMock(return_value = True)

        resp_1 = falcon.Response()
        resp_1.status = None
        resp_1.body = None

        self.assertRaises(falcon.HTTPInternalServerError, lambda : self.elb_resource.on_get(req, resp_1))
项目:Cuppa    作者:flipkart-incubator    | 项目源码 | 文件源码
def on_get(self, req, resp):
        """
        Handles GET requests for ELB HealthCheck Resource
        :param req:
        :param resp:
        :return:
        """
        uptime = int(time.time()) - self.start_time

        if self.load_balancer.check_if_model_to_workers_map_is_empty():
            resp.status = falcon.HTTP_500
            resp.body = "Model To Workers Map is Empty"
            raise falcon.HTTPInternalServerError('Internal Server Error', 'Model To Workers Map is Empty! ')

        resp.status = falcon.HTTP_200

        # TODO requests and capacity have to be calculated. They are hardcoded for now

        resp.body = json.dumps({'uptime':uptime, 'requests': 1, 'capacity': 100})
项目:Cuppa    作者:flipkart-incubator    | 项目源码 | 文件源码
def on_post(self, req, resp):
        payload = json.loads(req.stream.read())
        pi = PredictionInput()
        if ('url' in payload.keys()) and ('modelId' in payload.keys()):
            pi.url = payload['url']
            pi.model_id = payload['modelId']
        else:
            resp.status = falcon.HTTP_400
            raise falcon.HTTPBadRequest("Bad Request", "Url and(or) modelId missing in the payload")

        po = self.caffe_worker_client.predict(pi)

        if po.bo.status == 'Success':
            resp.status = falcon.HTTP_200
            resp.body = (str(po.values))
        elif po.bo.status == 'Failure':
            resp.body = json.dumps({'status': 'Failure', 'message' : 'Error occurred'})
            resp.status = falcon.HTTP_500
            raise falcon.HTTPInternalServerError('Internal Server Error', 'Predict failed! ')
项目:peach    作者:sebastiandev    | 项目源码 | 文件源码
def __init__(self,
                 app,
                 prefix,
                 name=None,
                 version=None,
                 media_type=None,
                 **kwargs):
        self._media_type = media_type or self.MEDIA_TYPE

        class EntryPoint(object):

            def on_get(self, req, resp):
                resp.body = json.dumps({
                    'name': name or 'Peach Rest Api',
                    'version': version or '0.0.0',
                })
                resp.status = falcon.HTTP_200

        app._media_type = self._media_type
        app.add_route('/{}'.format(prefix), EntryPoint())
        app.add_error_handler(ApiException, self.handle_error)
项目:py2swagger    作者:Arello-Mobile    | 项目源码 | 文件源码
def on_get(self, req, resp, id):
        """
        Handles GET requests for another test
        ---
        tags:
        - test
        responses:
          200:
            schema:
              type: string
        security:
          api_key: []
        securityDefinitions:
          api_key:
            type: apiKey
            in: Header
            name: Authorization
        """
        resp.status = falcon.HTTP_200  # This is the default status
        resp.body = '\nHello world with {}\n\n'.format(id)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def on_get(self, req, resp, **kwargs):
        """GET responder.

        Captures `req`, `resp`, and `kwargs`. Also sets up a sample response.

        Args:
            req: Falcon ``Request`` instance.
            resp: Falcon ``Response`` instance.
            kwargs: URI template *name=value* pairs, if any, along with
                any extra args injected by middleware.

        """

        # Don't try this at home - classes aren't recreated
        # for every request
        self.req, self.resp, self.kwargs = req, resp, kwargs

        self.called = True
        resp.status = falcon.HTTP_200
        resp.body = self.sample_body
        resp.set_headers(self.resp_headers)
项目:plex-watched-sync    作者:fiLLLip    | 项目源码 | 文件源码
def on_delete(self, req, resp, account_id):
        """
        Handles DELETE requests. Deletes the account from the database
        """
        raise falcon.HTTPMethodNotAllowed
        servers = Actions.get_servers(account_id)
        for server_id in servers:
            server.Actions.remove_account(server_id, account_id)
        result = server.Actions.delete_account(account_id)
        resp.status = falcon.HTTP_200
        if result == 1:
            result = 'success'
        else:
            result = 'failed'
        jsonresp = {'deleted': result}
        resp.body = json.dumps(jsonresp)
项目:iris-relay    作者:linkedin    | 项目源码 | 文件源码
def on_get(self, req, resp):
        """
        Echo back user provided content query string in TwiML:

        Example:
            Query strings:

            content: OK

            TwiML:

            <?xml version="1.0" encoding="UTF-8"?>
            <Response>
                <Say language="en-US" voice="alice">OK</Say>
            </Response>
        """
        content = req.get_param('content')
        loop = req.get_param('loop')
        r = twiml.Response()
        r.say(content, voice='alice', loop=loop, language="en-US")
        resp.status = falcon.HTTP_200
        resp.body = str(r)
        resp.content_type = 'application/xml'
项目:anaconda-project    作者:Anaconda-Platform    | 项目源码 | 文件源码
def on_get(self, req, resp):
        """Handles GET requests"""
        resp.body = """
<html>
  <head>
    <title>Quote API Server</title>
  </head>
  <body>
    <p>This is a toy JSON API server example.</p>
    <p>Make a GET request to <a href="%s/quote">%s/quote</a></p>
  </body>
</html>
""" % (self.prefix, self.prefix)
        resp.content_type = "text/html"
        resp.status = falcon.HTTP_200


# A Falcon middleware to implement validation of the Host header in requests
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def test_status_retrieve(self):
        """
        Verify retrieving Status.
        """
        with mock.patch('cherrypy.engine.publish') as _publish:
            manager = mock.MagicMock(StoreHandlerManager)
            _publish.return_value = [manager]

            child = MagicMock(value='')
            self.return_value._children = [child]
            self.return_value.leaves = self.return_value._children
            manager.get.return_value = self.return_value

            body = self.simulate_request('/api/v0/status')
            self.assertEqual(self.srmock.status, falcon.HTTP_200)
            self.assertEqual(
                json.loads(self.astatus),
                json.loads(body[0]))
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def test_clusters_listing(self):
        """
        Verify listing Clusters.
        """
        with mock.patch('cherrypy.engine.publish') as _publish:
            manager = mock.MagicMock(StoreHandlerManager)
            _publish.return_value = [manager]

            return_value = clusters.Clusters(
                clusters=[clusters.Cluster.new(
                    name=self.cluster_name, status='', hostset=[])])
            manager.list.return_value = return_value

            body = self.simulate_request('/api/v0/clusters')
            self.assertEqual(falcon.HTTP_200, self.srmock.status)

            self.assertEqual(
                [self.cluster_name],
                json.loads(body[0]))
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def test_cluster_delete(self):
        """
        Verify deleting a cluster.
        """
        with mock.patch('cherrypy.engine.publish') as _publish:
            manager = mock.MagicMock(StoreHandlerManager)
            _publish.return_value = [manager]

            # Verify with proper deletion
            manager.get.return_value = MagicMock()
            body = self.simulate_request(
                '/api/v0/cluster/development', method='DELETE')
            # Get is called to verify cluster exists
            self.assertEquals(falcon.HTTP_200, self.srmock.status)
            self.assertEquals('{}', body[0])

            # Verify when key doesn't exist
            manager.delete.side_effect = etcd.EtcdKeyNotFound
            body = self.simulate_request(
                '/api/v0/cluster/development', method='DELETE')
            self.assertEquals(falcon.HTTP_404, self.srmock.status)
            self.assertEquals('{}', body[0])
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def test_cluster_hosts_retrieve(self):
        """
        Verify retrieving a cluster host list.
        """
        with mock.patch('cherrypy.engine.publish') as _publish:
            manager = mock.MagicMock(StoreHandlerManager)
            _publish.return_value = [manager]

            # Verify if the cluster exists the host list is returned
            manager.get.return_value = make_new(CLUSTER_WITH_FLAT_HOST)
            body = self.simulate_request('/api/v0/cluster/cluster/hosts')
            self.assertEqual(falcon.HTTP_200, self.srmock.status)
            self.assertEqual(
                ['10.2.0.2'],
                json.loads(body[0]))

            # Verify bad cluster name returns the proper result
            manager.get.side_effect = Exception
            body = self.simulate_request('/api/v0/cluster/bogus/hosts')
            self.assertEqual(falcon.HTTP_404, self.srmock.status)
            self.assertEqual({}, json.loads(body[0]))
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def test_cluster_host_insert(self):
        """
        Verify insertion of host in a cluster.
        """
        with mock.patch('cherrypy.engine.publish') as _publish:
            manager = mock.MagicMock(StoreHandlerManager)
            _publish.return_value = [manager]

            # Verify inserting host returns the proper result
            manager.get.return_value = make_new(CLUSTER_WITH_FLAT_HOST)
            body = self.simulate_request(
                '/api/v0/cluster/developent/hosts/10.2.0.3', method='PUT')
            self.assertEqual(falcon.HTTP_200, self.srmock.status)
            self.assertEqual({}, json.loads(body[0]))

            # Verify bad cluster name returns the proper result
            manager.get.side_effect = Exception
            body = self.simulate_request(
                '/api/v0/cluster/bogus/hosts/10.2.0.3', method='PUT')
            self.assertEqual(falcon.HTTP_404, self.srmock.status)
            self.assertEqual({}, json.loads(body[0]))
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def test_cluster_host_delete(self):
        """
        Verify deletion of host in a cluster.
        """
        with mock.patch('cherrypy.engine.publish') as _publish:
            manager = mock.MagicMock(StoreHandlerManager)
            _publish.return_value = [manager]

            # Verify deleting host returns the proper result
            manager.get.return_value = make_new(CLUSTER_WITH_FLAT_HOST)
            body = self.simulate_request(
                '/api/v0/cluster/development/hosts/10.2.0.2', method='DELETE')
            self.assertEqual(falcon.HTTP_200, self.srmock.status)
            self.assertEqual({}, json.loads(body[0]))

            # Verify bad cluster name returns the proper result
            manager.get.side_effect = Exception
            body = self.simulate_request(
                '/api/v0/cluster/bogus/hosts/10.2.0.2', method='DELETE')
            self.assertEqual(falcon.HTTP_404, self.srmock.status)
            self.assertEqual({}, json.loads(body[0]))
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def test_cluster_deploy_retrieve(self):
        """
        Verify retrieving a cluster deploy.
        """
        with mock.patch('cherrypy.engine.publish') as _publish:
            manager = mock.MagicMock(StoreHandlerManager)
            _publish.return_value = [manager]

            test_cluster_deploy = make_new(CLUSTER_DEPLOY)
            # Verify if the cluster deploy exists the data is returned
            manager.get.return_value = test_cluster_deploy
            body = self.simulate_request('/api/v0/cluster/development/deploy')
            self.assertEqual(falcon.HTTP_200, self.srmock.status)
            self.assertEqual(json.loads(test_cluster_deploy.to_json()), json.loads(body[0]))

            # Verify no cluster deploy returns the proper result
            manager.reset_mock()
            manager.get.side_effect = (
                test_cluster_deploy,
                Exception)

            body = self.simulate_request('/api/v0/cluster/development/deploy')
            self.assertEqual(falcon.HTTP_204, self.srmock.status)
            self.assertEqual([], body)  # Empty data
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def test_host_creds_retrieve(self):
        """
        Verify retrieving Host Credentials.
        """
        with mock.patch('cherrypy.engine.publish') as _publish:
            manager = mock.MagicMock(StoreHandlerManager)
            _publish.return_value = [manager]

            # Verify if the host exists the data is returned
            manager.get.return_value = make_new(HOST)

            body = self.simulate_request('/api/v0/host/10.2.0.2/creds')
            # datasource's get should have been called once
            self.assertEqual(self.srmock.status, falcon.HTTP_200)
            self.assertEqual(
                json.loads(HOST_CREDS_JSON),
                json.loads(body[0]))

            # Verify no host returns the proper result
            manager.reset_mock()
            manager.get.side_effect = Exception

            body = self.simulate_request('/api/v0/host/10.9.9.9/creds')
            self.assertEqual(self.srmock.status, falcon.HTTP_404)
            self.assertEqual({}, json.loads(body[0]))
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def test_host_status_retrieve_host_only(self):
        """
        Verify retrieving Host status when it is in a host only cluster.
        """
        with mock.patch('cherrypy.engine.publish') as _publish:
            manager = mock.MagicMock(StoreHandlerManager)
            _publish.return_value = [manager]

            test_host = make_new(HOST)
            test_cluster = make_new(CLUSTER)
            test_cluster.hostset = [test_host.address]

            # Verify if the host exists the data is returned
            manager.get.side_effect = (
                test_host,
                test_cluster)

            body = self.simulate_request('/api/v0/host/10.2.0.2/status')
            self.assertEqual(self.srmock.status, falcon.HTTP_200)
            result = json.loads(body[0])
            self.assertEquals(C.CLUSTER_TYPE_HOST, result['type'])
            self.assertEquals('available', result['host']['status'])
            self.assertEquals({}, result['container_manager'])
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def test_networks_listing(self):
        """
        Verify listing Networks.
        """
        with mock.patch('cherrypy.engine.publish') as _publish:
            manager = mock.MagicMock(StoreHandlerManager)
            _publish.return_value = [manager]

            return_value = networks.Networks(
                networks=[networks.Network.new(name=self.network_name)])
            manager.list.return_value = return_value

            body = self.simulate_request('/api/v0/networks')
            self.assertEqual(falcon.HTTP_200, self.srmock.status)

            self.assertEqual(
                [self.network_name],
                json.loads(body[0]))
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def on_delete(self, req, resp, name):
        """
        Handles the deletion of a Cluster.

        :param req: Request instance that will be passed through.
        :type req: falcon.Request
        :param resp: Response instance that will be passed through.
        :type resp: falcon.Response
        :param name: The name of the Cluster being deleted.
        :type name: str
        """
        resp.body = '{}'

        try:
            store_manager = cherrypy.engine.publish('get-store-manager')[0]
            store_manager.delete(Cluster.new(name=name))
            resp.status = falcon.HTTP_200
            self.logger.info(
                'Deleted cluster {0} per request.'.format(name))
        except:
            self.logger.info(
                'Deleting for non-existent cluster {0} requested.'.format(
                    name))
            resp.status = falcon.HTTP_404
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def on_delete(self, req, resp, name, address):
        """
        Handles DELETE requests for individual hosts in a Cluster.
        This removes a single host from the cluster, idempotently.

        :param req: Request instance that will be passed through.
        :type req: falcon.Request
        :param resp: Response instance that will be passed through.
        :type resp: falcon.Response
        :param name: The name of the Cluster being requested.
        :type name: str
        :param address: The address of the Host being requested.
        :type address: str
        """
        try:
            util.etcd_cluster_remove_host(name, address)
            resp.status = falcon.HTTP_200
        except KeyError:
            resp.status = falcon.HTTP_404
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def on_get(self, req, resp):
        """
        Handles GET requests for Hosts.

        :param req: Request instance that will be passed through.
        :type req: falcon.Request
        :param resp: Response instance that will be passed through.
        :type resp: falcon.Response
        """

        try:
            store_manager = cherrypy.engine.publish('get-store-manager')[0]
            hosts = store_manager.list(Hosts(hosts=[]))
            if len(hosts.hosts) == 0:
                raise Exception()
            resp.status = falcon.HTTP_200
            req.context['model'] = hosts
        except Exception:
            # This was originally a "no content" but I think a 404 makes
            # more sense if there are no hosts
            self.logger.warn(
                'Store does not have any hosts. Returning [] and 404.')
            resp.status = falcon.HTTP_404
            req.context['model'] = None
            return
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def on_get(self, req, resp):
        """
        Handles GET requests for Networks.

        :param req: Request instance that will be passed through.
        :type req: falcon.Request
        :param resp: Response instance that will be passed through.
        :type resp: falcon.Response
        """

        try:
            store_manager = cherrypy.engine.publish('get-store-manager')[0]
            networks = store_manager.list(Networks(networks=[]))
            if len(networks.networks) == 0:
                raise Exception()
            resp.status = falcon.HTTP_200
            resp.body = json.dumps([
                network.name for network in networks.networks])
        except Exception:
            self.logger.warn(
                'Store does not have any networks. Returning [] and 404.')
            resp.status = falcon.HTTP_404
            req.context['model'] = None
            return
项目:commissaire-mvp    作者:projectatomic    | 项目源码 | 文件源码
def on_delete(self, req, resp, name):
        """
        Handles the Deletion of a Network.

        :param req: Request instance that will be passed through.
        :type req: falcon.Request
        :param resp: Response instance that will be passed through.
        :type resp: falcon.Response
        :param name: The friendly name of the network.
        :type address: str
        """
        resp.body = '{}'
        store_manager = cherrypy.engine.publish('get-store-manager')[0]
        try:
            store_manager.delete(Network.new(name=name))
            resp.status = falcon.HTTP_200
        except Exception as error:
            self.logger.warn('{}: {}'.format(type(error), error))
            resp.status = falcon.HTTP_404
项目:gyr    作者:non-Jedi    | 项目源码 | 文件源码
def on_get(self, request, response, room_alias=None):
        """Called when a GET request is sent to /rooms/{room_alias}"""
        response.body = "{}"
        if self.handler(room_alias):
            response.status = falcon.HTTP_200
            self.api.create_room(alias=room_alias)
        else:
            response.status = falcon.HTTP_404
项目:gyr    作者:non-Jedi    | 项目源码 | 文件源码
def on_get(self, request, response, user_id=None):
        """Responds to GET request for users."""
        response.body = "{}"
        if self.handler(user_id):
            response.status = falcon.HTTP_200
            self.api.register(utils.mxid2localpart(user_id))
        else:
            response.status = falcon.HTTP_404