Python flask 模块,Response() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.Response()

项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def get(self, oid):
        """Get an object.

        Returns an item from the DB with the request.data JSON object or all
        the items if oid == None

        :arg self: The class of the object to be retrieved
        :arg integer oid: the ID of the object in the DB
        :returns: The JSON item/s stored in the DB

        """
        try:
            ensure_authorized_to('read', self.__class__)
            query = self._db_query(oid)
            json_response = self._create_json_response(query, oid)
            return Response(json_response, mimetype='application/json')
        except Exception as e:
            return error.format_exception(
                e,
                target=self.__class__.__name__.lower(),
                action='GET')
项目:quokka_ng    作者:rochacbruno    | 项目源码 | 文件源码
def export_to_csv(self, ids):
        qs = json.loads(self.model.objects(id__in=ids).to_json())

        def generate():
            yield ','.join(list(max(qs, key=lambda x: len(x)).keys())) + '\n'
            for item in qs:
                yield ','.join([str(i) for i in list(item.values())]) + '\n'

        return Response(
            generate(),
            mimetype="text/csv",
            headers={
                "Content-Disposition":
                "attachment;filename=%s.csv" % self.model.__name__.lower()
            }
        )
项目:zimfarm    作者:openzim    | 项目源码 | 文件源码
def enqueue_mwoffliner():
    token = UserJWT.from_request_header(request)

    # only admins can enqueue task
    if not token.is_admin:
        raise exception.NotEnoughPrivilege()

    def check_task(config: dict):
        # check config is a dict
        if not isinstance(config, dict):
            raise exception.InvalidRequest()

        # check config has mandatory data
        if config.get('mwUrl') is None or config.get('adminEmail') is None:
            raise exception.InvalidRequest()

    def enqueue_task(config: dict):
        task_name = 'mwoffliner'

        celery_task = celery.send_task(task_name, kwargs={
            'token': TaskJWT.new(task_name),
            'config': config
        })

        TasksCollection().insert_one({
            '_id': celery_task.id,
            'celery_task_name': task_name,
            'status': 'PENDING',
            'time_stamp': {'created': datetime.utcnow(), 'started': None, 'ended': None},
            'options': config,
            'steps': []
        })

    task_configs = request.get_json()
    if not isinstance(task_configs, list):
        raise exception.InvalidRequest()
    for task_config in task_configs:
        check_task(task_config)
    for task_config in task_configs:
        enqueue_task(task_config)
    return Response(status=202)
项目:zimfarm    作者:openzim    | 项目源码 | 文件源码
def invalid_request(_):
    return Response(status=400)
项目:PEBA    作者:dtag-dev-sec    | 项目源码 | 文件源码
def retrieveAlertsCyber():
    """ Retrieve Alerts from ElasticSearch and return formatted 
        XML with limited alert content
    """

    # get result from cache
    getCacheResult = getCache(request.url, "url")
    if getCacheResult is not False:
        app.logger.debug('Returning /retrieveAlertsCyber from Cache for %s' % str(request.remote_addr))
        return Response(getCacheResult)

    # query ES
    else:
        returnResult = formatAlertsXml(queryAlerts(app.config['MAXALERTS'], checkCommunityIndex(request)))
        setCache(request.url, returnResult, 1, "url")
        app.logger.debug('Returning /retrieveAlertsCyber from ES for %s' % str(request.remote_addr))
        return Response(returnResult, mimetype='text/xml')
项目:PEBA    作者:dtag-dev-sec    | 项目源码 | 文件源码
def querySingleIP():
    """ Retrieve Attack data from index about a single IP
    """

    # get result from cache
    getCacheResult = getCache(request.url, "url")
    if getCacheResult is not False:
        app.logger.debug('Returning /querySingleIP from Cache for %s' % str(request.remote_addr))
        return Response(getCacheResult)

    # query ES
    else:
        returnResult = formatSingleIP(queryForSingleIP(app.config['MAXALERTS'], request.args.get('ip'), checkCommunityIndex(request)))
        setCache(request.url, returnResult, 60, "url")
        app.logger.debug('Returning /querySingleIP from ES for %s' % str(request.remote_addr))
        return Response(returnResult, mimetype='text/xml')

# Routes with both XML and JSON output
项目:PEBA    作者:dtag-dev-sec    | 项目源码 | 文件源码
def retrieveAlertsCount():
    """ Retrieve number of alerts in timeframe (GET-Parameter time as decimal or "day") """

    # Retrieve Number of Alerts from ElasticSearch and return as xml / json
    if not request.args.get('time'):
        app.logger.error('No time GET-parameter supplied in retrieveAlertsCount. Must be decimal number (in minutes) or string "day"')
        return app.config['DEFAULTRESPONSE']
    else:
        if request.args.get('out') and request.args.get('out') == 'json':
            # get result from cache
            getCacheResult = getCache(request.url, "url")
            if getCacheResult is not False:
                return jsonify(getCacheResult)
            else:
                returnResult = formatAlertsCount(queryAlertsCount(request.args.get('time'), checkCommunityIndex(request)), 'json')
                setCache(request.url, returnResult, 60, "url")
                return jsonify(returnResult)

        else:
            # get result from cache
            getCacheResult = getCache(request.url, "url")
            if getCacheResult is not False:
                return Response(getCacheResult, mimetype='text/xml')
            else:
                returnResult = formatAlertsCount(queryAlertsCount(request.args.get('time'), checkCommunityIndex(request)), 'xml')
                setCache(request.url, returnResult, 60, "url")
                return Response(returnResult, mimetype='text/xml')
项目:PEBA    作者:dtag-dev-sec    | 项目源码 | 文件源码
def retrieveIPs():
    """ Retrieve IPs from ElasticSearch and return formatted XML or JSON with IPs """

    if request.args.get('out') and request.args.get('out') == 'json':
        getCacheResult = getCache(request.url, "url")
        if getCacheResult is not False:
            return jsonify(getCacheResult)
        else:
            returnResult = formatBadIP(
                queryBadIPs(app.config['BADIPTIMESPAN'], checkCommunityIndex(request)), 'json')
            setCache(request.url, returnResult, 60, "url")
            return jsonify(returnResult)
    else:
        getCacheResult = getCache(request.url, "url")
        if getCacheResult is not False:
            return Response(getCacheResult, mimetype='text/xml')
        else:
            returnResult = formatBadIP(
                queryBadIPs(app.config['BADIPTIMESPAN'], checkCommunityIndex(request)), 'xml')
            setCache(request.url, returnResult, 60, "url")
            return Response(returnResult, mimetype='text/xml')


# Routes with JSON output
项目:PEBA    作者:dtag-dev-sec    | 项目源码 | 文件源码
def getSimpleMessage():
    return Response("POST is required for this action.", mimetype='text/html', status=500)
项目:AutoTriageBot    作者:salesforce    | 项目源码 | 文件源码
def authFailure():
    """ Authentication failure --> 401 """
    return Response("Authentication failed!", 401, {'WWW-Authenticate': 'Basic realm="Login Required"'})
项目:AutoTriageBot    作者:salesforce    | 项目源码 | 文件源码
def authed(func: Callable[[], str]) -> Callable[[], Union[Response, str]]:
    """ Given a function returns one that requires basic auth """
    @wraps(func)
    def decorator():
        auth = request.authorization
        if auth and validAuth(auth.username, auth.password):
            return func()
        return authFailure()
    return decorator
项目:sitrep    作者:bittorrent    | 项目源码 | 文件源码
def cache(self, seconds=60):
        def inner_decorator(f):
            @wraps(f)
            def wrapper(*args, **kwds):
                resp = f(*args, **kwds)
                if not isinstance(resp, flask.Response):
                    resp = flask.make_response(resp)
                resp.headers['Cache-Control'] = 'public, max-age={}'.format(seconds)
                resp.headers["Expires"] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(time.time() + seconds))
                resp.add_etag()
                return resp
            return wrapper
        return inner_decorator
项目:sitrep    作者:bittorrent    | 项目源码 | 文件源码
def require_api_token(self, f):
        @wraps(f)
        def wrapper(*args, **kwds):
            token = flask.request.headers.get('X-Status-API-Token', flask.request.headers.get('X-Live-Status-API-Token'))
            if token != self.settings['api_token']:
                resp = flask.make_response(flask.jsonify({}), 403)
            else:
                resp = f(*args, **kwds)
                if not isinstance(resp, flask.Response):
                    resp = flask.make_response(resp)
            resp.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate'
            resp.headers['Pragma'] = 'no-cache'
            resp.headers['Expires'] = '0'
            return resp
        return wrapper
项目:loving-ai    作者:opencog    | 项目源码 | 文件源码
def _chat():
    data = request.args
    question = data.get('question')
    session = data.get('session')
    lang = data.get('lang', 'en')
    query = data.get('query', 'false')
    query = query.lower() == 'true'
    request_id = request.headers.get('X-Request-Id')
    marker = data.get('marker', 'default')
    response, ret = ask(
        question, lang, session, query, request_id=request_id, marker=marker)
    return Response(json_encode({'ret': ret, 'response': response}),
                    mimetype="application/json")
项目:loving-ai    作者:opencog    | 项目源码 | 文件源码
def _batch_chat():
    auth = request.form.get('Auth')
    if not auth or not check_auth(auth):
        return authenticate()

    questions = request.form.get('questions')
    questions = json.loads(questions)
    session = request.form.get('session')
    lang = request.form.get('lang', 'en')
    responses = []
    for idx, question in questions:
        response, ret = ask(str(question), lang, session)
        responses.append((idx, response, ret))
    return Response(json_encode({'ret': 0, 'response': responses}),
                    mimetype="application/json")
项目:loving-ai    作者:opencog    | 项目源码 | 文件源码
def _said():
    data = request.args
    session = data.get('session')
    message = data.get('message')
    ret, response = said(session, message)
    return Response(json_encode({'ret': ret, 'response': response}),
                    mimetype="application/json")
项目:loving-ai    作者:opencog    | 项目源码 | 文件源码
def _rate():
    data = request.args
    response = ''
    try:
        ret = rate_answer(data.get('session'), int(
            data.get('index')), data.get('rate'))
    except Exception as ex:
        response = ex.message
    return Response(json_encode({'ret': ret, 'response': response}),
                    mimetype="application/json")
项目:loving-ai    作者:opencog    | 项目源码 | 文件源码
def _chatbots():
    data = request.args
    lang = data.get('lang', None)
    session = data.get('session')
    characters = list_character(lang, session)
    return Response(json_encode({'ret': 0, 'response': characters}),
                    mimetype="application/json")
项目:loving-ai    作者:opencog    | 项目源码 | 文件源码
def _start_session():
    botname = request.args.get('botname')
    user = request.args.get('user')
    test = request.args.get('test', 'false')
    refresh = request.args.get('refresh', 'false')
    test = test.lower() == 'true'
    refresh = refresh.lower() == 'true'
    sid = session_manager.start_session(
        user=user, key=botname, test=test, refresh=refresh)
    sess = session_manager.get_session(sid)
    sess.sdata.botname = botname
    sess.sdata.user = user
    return Response(json_encode({'ret': 0, 'sid': str(sid)}),
                    mimetype="application/json")
项目:loving-ai    作者:opencog    | 项目源码 | 文件源码
def _sessions():
    sessions = session_manager.list_sessions()
    return Response(json_encode({'ret': 0, 'response': sessions}),
                    mimetype="application/json")
项目:loving-ai    作者:opencog    | 项目源码 | 文件源码
def _set_weights():
    data = request.args
    lang = data.get('lang', None)
    param = data.get('param')
    sid = data.get('session')
    ret, response = set_weights(param, lang, sid)
    if ret:
        sess = session_manager.get_session(sid)
        if sess and hasattr(sess.sdata, 'weights'):
            logger.info("Set weights {} successfully".format(sess.sdata.weights))
    else:
        logger.info("Set weights failed.")
    return Response(json_encode({'ret': ret, 'response': response}),
                    mimetype="application/json")
项目:loving-ai    作者:opencog    | 项目源码 | 文件源码
def _set_context():
    data = request.args
    context_str = data.get('context')
    context = {}
    for tok in context_str.split(','):
        k, v = tok.split('=')
        context[k.strip()] = v.strip()
    sid = data.get('session')
    ret, response = set_context(context, sid)
    return Response(json_encode({'ret': ret, 'response': response}),
                    mimetype="application/json")
项目:loving-ai    作者:opencog    | 项目源码 | 文件源码
def _remove_context():
    data = request.args
    keys = data.get('keys')
    keys = keys.split(',')
    sid = data.get('session')
    ret, response = remove_context(keys, sid)
    return Response(json_encode({'ret': ret, 'response': response}),
                    mimetype="application/json")
项目:loving-ai    作者:opencog    | 项目源码 | 文件源码
def _update_config():
    data = request.args.to_dict()
    for k, v in data.iteritems():
        if v.lower() == 'true':
            data[k]=True
        elif v.lower() == 'false':
            data[k]=False
        elif re.match(r'[0-9]+', v):
            data[k]=int(v)
        elif re.match(r'[0-9]+\.[0-9]+', v):
            data[k]=float(v)
        else:
            data[k]=str(v)
    ret, response = update_config(**data)
    return Response(json_encode({'ret': ret, 'response': response}),
                    mimetype="application/json")
项目:loving-ai    作者:opencog    | 项目源码 | 文件源码
def _log():
    def generate():
        with open(LOG_CONFIG_FILE) as f:
            for row in f:
                yield row
    return Response(generate(), mimetype='text/plain')
项目:loving-ai    作者:opencog    | 项目源码 | 文件源码
def _reset_session():
    data = request.args
    sid = data.get('session')
    if session_manager.has_session(sid):
        session_manager.reset_session(sid)
        ret, response = True, "Session reset"
    else:
        ret, response = False, "No such session"
    return Response(json_encode({
        'ret': ret,
        'response': response
    }),
        mimetype="application/json")
项目:loving-ai    作者:opencog    | 项目源码 | 文件源码
def _dump_history():
    try:
        dump_history()
        ret, response = True, "Success"
    except Exception:
        ret, response = False, "Failure"
    return Response(json_encode({
        'ret': ret,
        'response': response
    }),
        mimetype="application/json")
项目:loving-ai    作者:opencog    | 项目源码 | 文件源码
def _stats():
    try:
        data = request.args
        days = int(data.get('lookback', 7))
        dump_history()
        response = history_stats(HISTORY_DIR, days)
        ret = True
    except Exception as ex:
        ret, response = False, {'err_msg': str(ex)}
        logger.error(ex)
    return Response(json_encode({'ret': ret, 'response': response}),
                    mimetype="application/json")
项目:loving-ai    作者:opencog    | 项目源码 | 文件源码
def authenticate():
    return Response(json_encode({'ret': 401, 'response': {'text': 'Could not verify your access'}}),
                    mimetype="application/json")
项目:sms-proxy    作者:flowroute    | 项目源码 | 文件源码
def add_virtual_tn():
    """
    The VirtualTN resource endpoint for adding VirtualTN's from the pool.
    """
    body = request.json
    try:
        value = str(body['value'])
        assert len(value) <= 18
    except (AssertionError, KeyError):
        raise InvalidAPIUsage(
            "Required argument: 'value' (str, length <= 18)",
            payload={'reason':
                     'invalidAPIUsage'})
    virtual_tn = VirtualTN(value)
    try:
        db_session.add(virtual_tn)
        db_session.commit()
    except IntegrityError:
        db_session.rollback()
        msg = ("Did not add virtual TN {} to the pool "
               "-- already exists").format(value)
        log.info({"message": msg})
        raise InvalidAPIUsage(
            "Virtual TN already exists",
            payload={'reason':
                     'duplicate virtual TN'})
    return Response(
        json.dumps(
            {"message": "Successfully added TN to pool",
             "value": value}),
        content_type="application/json")
项目:sms-proxy    作者:flowroute    | 项目源码 | 文件源码
def list_virtual_tns():
    """
    The VirtualTN resource endpoint for listing VirtualTN's from the pool.
    """
    virtual_tns = VirtualTN.query.all()
    res = [{'value': tn.value, 'session_id': tn.session_id} for tn in virtual_tns]
    available = len([tn.value for tn in virtual_tns if tn.session_id is None])
    return Response(
        json.dumps({"virtual_tns": res,
                    "pool_size": len(res),
                    "available": available,
                    "in_use": len(res) - available}),
        content_type="application/json")
项目:sms-proxy    作者:flowroute    | 项目源码 | 文件源码
def list_proxy_sessions():
    """
    The ProxySession resource endpoint for listing ProxySessions
    from the pool.
    """
    sessions = ProxySession.query.all()
    res = [{
        'id': s.id,
        'date_created': s.date_created.strftime('%Y-%m-%d %H:%M:%S'),
        'virtual_tn': s.virtual_TN,
        'participant_a': s.participant_a,
        'participant_b': s.participant_b,
        'expiry_date': s.expiry_date.strftime('%Y-%m-%d %H:%M:%S')
        if s.expiry_date else None}
        for s in sessions]
    return Response(
        json.dumps({"total_sessions": len(res), "sessions": res}),
        content_type="application/json")
项目:sms-proxy    作者:flowroute    | 项目源码 | 文件源码
def delete_session():
    """
    The ProxySession resource endpoint for removing a ProxySession
    to the pool.
    """
    body = request.json
    try:
        session_id = str(body['session_id'])
    except (KeyError):
        raise InvalidAPIUsage(
            "Required argument: 'session_id' (str)",
            payload={'reason':
                     'invalidAPIUsage'})
    try:
        session = ProxySession.query.filter_by(id=session_id).one()
    except NoResultFound:
        msg = ("ProxySession {} could not be deleted because"
               " it does not exist".format(session_id))
        log.info({"message": msg,
                  "status": "failed"})
        raise InvalidAPIUsage(
            msg, status_code=404,
            payload={'reason':
                     'ProxySession not found'})
    participant_a, participant_b, virtual_tn = ProxySession.terminate(
        session_id)
    recipients = [participant_a, participant_b]
    send_message(
        recipients,
        virtual_tn.value,
        SESSION_END_MSG,
        session_id,
        is_system_msg=True)
    msg = "Ended session {} and released {} back to pool".format(
        session_id, virtual_tn.value)
    log.info({"message": msg, "status": "succeeded"})
    return Response(
        json.dumps({"message": "Successfully ended the session.",
                    "status": "succeeded",
                    "session_id": session_id}),
        content_type="application/json")
项目:hgvs-eval    作者:biocommons    | 项目源码 | 文件源码
def getSwaggerJson():
    json_file = open(os.path.join(
                     "./", "messages.swagger.json"), "r")
    json = json_file.read()
    json_file.close()
    resp = Response(response=json,
                    status=200,
                    mimetype="application/json")
    return resp
项目:geocoder-ie    作者:devgateway    | 项目源码 | 文件源码
def corpora_list():
    page = 1
    query = None
    category = None
    doc = None
    if 'page' in request.args:
        page = request.args['page']

    if 'query' in request.args:
        query = request.args['query']

    if 'category' in request.args:
        category = request.args['category']

    if 'doc' in request.args:
        doc = unquote(request.args['doc'])

    return Response(json.dumps(get_sentences(page=page, query=query, category=category, document=doc)),
                    mimetype='application/json')
项目:geocoder-ie    作者:devgateway    | 项目源码 | 文件源码
def corpora_docs_list():
    return Response(json.dumps(get_doc_list()), mimetype='application/json')
项目:geocoder-ie    作者:devgateway    | 项目源码 | 文件源码
def docs_list():
    page = 1
    doc_type = None
    state = None
    if 'page' in request.args:
        page = request.args['page']

    if 'doc_type' in request.args:
        doc_type = request.args['doc_type']

    if 'state' in request.args:
        state = request.args['state']
        states = [state]
        if state == ST_PENDING:
            states.append(ST_PROCESSING)

        if state == ST_PROCESSED:
            states.append(ST_ERROR)

    return Response(json.dumps(get_queue_list(page=page, doc_type=doc_type, states=states), default=datetime_handler),
                    mimetype='application/json')
项目:geocoder-ie    作者:devgateway    | 项目源码 | 文件源码
def geocoding_list():
    activity_id = None
    queue_id = None
    if 'activity_id' in request.args:
        activity_id = request.args['activity_id']

    if 'queue_id' in request.args:
        queue_id = request.args['queue_id']

    return Response(json.dumps(get_geocoding_list(activity_id=activity_id, queue_id=queue_id),
                               default=datetime_handler), mimetype='application/json')
项目:geocoder-ie    作者:devgateway    | 项目源码 | 文件源码
def activity_list():
    document_id = None
    if 'document_id' in request.args:
        document_id = request.args['document_id']

    return Response(json.dumps(get_activity_list(document_id=document_id),
                               default=datetime_handler), mimetype='application/json')
项目:flyby    作者:Skyscanner    | 项目源码 | 文件源码
def haproxy_config():
    return flask.Response(Haproxy().config, mimetype='application/txt')
项目:roomfinder    作者:GuillaumeMorini    | 项目源码 | 文件源码
def process_demoroom_members():
    # Verify that the request is propery authorized
    #authz = valid_request_check(request)
    #if not authz[0]:
    #    return authz[1]

    status = 200
    if request.method == "POST":
        data = request.form
        try:
            sys.stderr.write("Adding %s to demo room.\n" % (data["email"]))
            reply=send_welcome_message(data["email"])
            status = 201
            resp = Response(reply, content_type='application/json', status=status)
        except KeyError:
            error = {"Error":"API Expects dictionary object with single element and key of 'email'"}
            status = 400
            resp = Response(json.dumps(error), content_type='application/json', status=status)
    # demo_room_members = get_membership()
    # resp = Response(
    #     json.dumps(demo_room_members, sort_keys=True, indent = 4, separators = (',', ': ')),
    #     content_type='application/json',
    #     status=status)
    else:
        resp = Response("OK", status=status)
    return resp
项目:connectbox-pi    作者:ConnectBox    | 项目源码 | 文件源码
def remove_authorised_client(ip_addr_str=None):
    """Forgets that a client has been seen recently to allow running tests"""
    source_ip = request.headers["X-Forwarded-For"]
    if source_ip in _client_map:
        del _client_map[source_ip]

    return Response(status=204)
项目:sysu-ctf    作者:ssst0n3    | 项目源码 | 文件源码
def custom_css():
    return Response(get_config("css"), mimetype='text/css')


# Static HTML files
项目:allennlp    作者:allenai    | 项目源码 | 文件源码
def post_json(self, endpoint: str, data: JsonDict) -> Response:
        return self.client.post(endpoint,
                                content_type="application/json",
                                data=json.dumps(data))
项目:allennlp    作者:allenai    | 项目源码 | 文件源码
def test_permalinks_work(self):
        db = InMemoryDemoDatabase()
        app = make_app(build_dir=self.TEST_DIR, demo_db=db)
        predictor = CountingPredictor()
        app.predictors = {"counting": predictor}
        app.testing = True
        client = app.test_client()

        def post(endpoint: str, data: JsonDict) -> Response:
            return client.post(endpoint, content_type="application/json", data=json.dumps(data))

        data = {"some": "input"}
        response = post("/predict/counting", data=data)

        assert response.status_code == 200
        result = json.loads(response.get_data())
        slug = result.get("slug")
        assert slug is not None

        response = post("/permadata", data={"slug": "not the right slug"})
        assert response.status_code == 400

        response = post("/permadata", data={"slug": slug})
        assert response.status_code == 200
        result2 = json.loads(response.get_data())
        assert set(result2.keys()) == {"modelName", "requestData", "responseData"}
        assert result2["modelName"] == "counting"
        assert result2["requestData"] == data
        assert result2["responseData"] == result
项目:blog-code-examples    作者:fullstackpython    | 项目源码 | 文件源码
def show_page(page):
    try:
        valid_length = len(page) >= MIN_PAGE_NAME_LENGTH
        valid_name = re.match('^[a-z]+$', page.lower()) is not None
        if valid_length and valid_name:
            return render_template("{}.html".format(page))
        else:
            msg = "Sorry, couldn't find page with name {}".format(page)
            raise NotFound(msg)
    except:
        rollbar.report_exc_info()
        return Response("404 Not Found")
项目:microcosm-flask    作者:globality-corp    | 项目源码 | 文件源码
def make_response(self, response_data, headers):
        # TODO: pass in optional filename
        filename = "response.csv"
        headers["Content-Disposition"] = "attachment; filename=\"{}\"".format(filename)
        headers["Content-Type"] = "{}; charset=utf-8".format(CSV_CONTENT_TYPE)

        response = Response(self.csvify(response_data), mimetype=CSV_CONTENT_TYPE)
        return response, headers
项目:microcosm-flask    作者:globality-corp    | 项目源码 | 文件源码
def csvify(self, response_data):
        """
        Make Flask `Response` object, with data returned as a generator for the CSV content
        The CSV is built from JSON-like object (Python `dict` or list of `dicts`)

        """
        if "items" in response_data:
            list_response_data = response_data["items"]
        else:
            list_response_data = [response_data]

        response_fields = list(list_response_data[0].keys())

        column_order = getattr(self.response_schema, "csv_column_order", None)
        if column_order is None:
            # We should still be able to return a CSV even if no column order has been specified
            column_names = response_fields
        else:
            column_names = self.response_schema.csv_column_order
            # The column order be only partially specified
            column_names.extend([field_name for field_name in response_fields if field_name not in column_names])

        output = StringIO()
        csv_writer = writer(output, quoting=QUOTE_MINIMAL)
        csv_writer.writerow(column_names)
        for item in list_response_data:
            csv_writer.writerow([item[column] for column in column_names])
        # Ideally we'd want to `yield` each line to stream the content
        # But something downstream seems to break streaming
        yield output.getvalue()
项目:endeavour    作者:h2020-endeavour    | 项目源码 | 文件源码
def setup(self):
        @self.app.route('/anomaly', methods = ['POST'])
        def api_anomaly():
            data = request.json
            if request.headers['Content-Type'] == 'application/json':
                success = self.app.monitor.process_anomaly_data(data) 
                return self.handle_response(success, data)
            else:
                return Response("Unsupported media type\n" + data, status=415)

        @self.app.route('/monitor', methods = ['POST'])
        def api_monitor():
            data = request.json
            if request.headers['Content-Type'] == 'application/json':
                success = self.app.monitor.process_monitor_flows(data) 
                return self.handle_response(success, data)
            else:
                return Response("Unsupported media type\n" + data, status=415)
项目:endeavour    作者:h2020-endeavour    | 项目源码 | 文件源码
def handle_response(self, success, data):
        json_data = json.dumps(data)
        if success:
            return Response("OK\n" + json_data, status=200)
        else:
            return Response("BAD REQUEST\n" + json_data, status=400)