Python flask.current_app 模块,response_class() 实例源码

我们从Python开源项目中,提取了以下25个代码示例,用于说明如何使用flask.current_app.response_class()

项目:blog-a    作者:richardchien    | 项目源码 | 文件源码
def support_jsonp(f):
    """
    Wraps JSONified output for JSONP
    https://gist.github.com/richardchien/7b7c2727feb3e8993845a3ce61bad808
    """

    @wraps(f)
    def decorated_function(*args, **kwargs):
        callback = request.args.get('callback', False)
        if callback:
            content = str(callback) + '(' + f(*args, **kwargs).data.decode('utf-8') + ')'
            return current_app.response_class(content, mimetype='application/json')
        else:
            return f(*args, **kwargs)

    return decorated_function
项目:dune    作者:onyb    | 项目源码 | 文件源码
def jsonp(func):
    """
    Wraps JSONified output for JSONP requests.
    """

    @wraps(func)
    def decorated_function(*args, **kwargs):
        callback = request.args.get('callback', False)
        if callback:
            data = str(func(*args, **kwargs).data)
            content = str(callback) + '(' + data + ')'
            mimetype = 'application/javascript'
            return current_app.response_class(
                content,
                mimetype=mimetype
            )
        else:
            return func(*args, **kwargs)

    return decorated_function
项目:DiggerPlus    作者:DiggerPlus    | 项目源码 | 文件源码
def status(code):
    def response(*args, **kwargs):
        if args and kwargs:
            raise TypeError("Cannot pass both args and kwargs.")
        elif len(args) == 1:
            data = args[0]
        else:
            data = args or kwargs

        if data:
            data = (json.dumps(data), '\n')
        else:
            data = None

        return current_app.response_class(
            response=data,
            status=code,
            mimetype=current_app.config['JSONIFY_MIMETYPE']
        )
    return response
项目:flask-sse    作者:singingwolfboy    | 项目源码 | 文件源码
def stream(self):
        """
        A view function that streams server-sent events. Ignores any
        :mailheader:`Last-Event-ID` headers in the HTTP request.
        Use a "channel" query parameter to stream events from a different
        channel than the default channel (which is "sse").
        """
        channel = request.args.get('channel') or 'sse'

        @stream_with_context
        def generator():
            for message in self.messages(channel=channel):
                yield str(message)

        return current_app.response_class(
            generator(),
            mimetype='text/event-stream',
        )
项目:colab-server    作者:colab-chat    | 项目源码 | 文件源码
def stream(self):
        """
        A view function that streams server-sent events. Ignores any
        :mailheader:`Last-Event-ID` headers in the HTTP request.
        """
        current_app.logger.debug('in ServerSentEventsBlueprint.stream')

        @stream_with_context
        def generator():
            for message in self.messages():
                lines = ["data:{value}".format(value=line) for line in
                         message.splitlines()]
                lines.insert(0, "event:{value}".format(value='message'))
                yield "\n".join(lines) + "\n\n"

        return current_app.response_class(
            generator(),
            mimetype='text/event-stream',
        )
项目:flask-restler    作者:klen    | 项目源码 | 文件源码
def to_json_response(self, response, headers=None):
        """Serialize simple response to Flask response."""
        if self.raw:
            return response
        response = current_app.response_class(
            dumps(response, indent=2), mimetype='application/json')
        if headers:
            response.headers.extend(headers)
        return response
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def jsonpify(f):
    """Wrap JSONified output for JSONP."""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        callback = request.args.get('callback', False)
        if callback:
            content = str(callback) + '(' + str(f(*args, **kwargs).data) + ')'
            return current_app.response_class(content,
                                              mimetype='application/javascript')
        else:
            return f(*args, **kwargs)
    return decorated_function
项目:logtoes    作者:adhorn    | 项目源码 | 文件源码
def json_response(fn):
    @wraps(fn)
    def wrapped(*args, **kwargs):
        result = fn(*args, **kwargs)
        if isinstance(result, current_app.response_class):
            return result
        if isinstance(result, (list, tuple)):
            result = {'items': result}
        data = json.dumps(result, cls=ModelJSONEncoder)
        return current_app.response_class(data, mimetype='application/json')
    return wrapped
项目:isthislegit    作者:duo-labs    | 项目源码 | 文件源码
def get_actions():
    """
    Returns the possible actions configured on the system.
    """
    return current_app.response_class(
        json.dumps([
            action.to_dict(domain=g.domain, user=g.user) for action in actions
        ]),
        mimetype='application/json')
项目:kuberdock-platform    作者:cloudlinux    | 项目源码 | 文件源码
def send_stream():
    conn = ConnectionPool.get_connection()
    channel = getattr(session, 'sid', None)
    if channel is None:
        channel = 'common'
    last_id = request.headers.get('Last-Event-Id')
    if last_id is None:
        last_id = request.args.get('lastid')
    return current_app.response_class(
        EvtStream(conn, channel, last_id),
        direct_passthrough=True,
        mimetype='text/event-stream')
项目:covador    作者:baverman    | 项目源码 | 文件源码
def error_handler(ctx):
    return current_app.response_class(error_to_json(ctx.exception),
                                      mimetype='application/json', status=400)
项目:partycrasher    作者:naturalness    | 项目源码 | 文件源码
def home(filename=None):
    context = dict(
        bower=full_url_for('home') + 'bower_components',
        node_modules=full_url_for('home') + 'node_modules',
        project_names=[proj for proj in crasher.projects],
        type_names=[t for t in crasher.types],
        thresholds=[str(thresh) for thresh in crasher.thresholds],
        basehref=full_url_for('home'),
        restbase=full_url_for('root'),
        default_threshold=str(crasher.default_threshold),
        fixed_fields=list(crasher.config.UserInterface.fixed_summary_fields),
    )
    if filename and os.path.exists(relative('ui/app/', filename)):
        if 'main.css' in filename:
            css = current_app.response_class(
                render_template(filename, **context),
                mimetype="text/css")
            return css
        else:
            # It's a static file.
            return send_from_directory(relative('ui/app/'), filename)
    elif filename and 'views/' in filename and filename.endswith('.html'):
        # 404 on missing view...
        # If this is not here, Angular could try to load the index page in
        # an infinite loop. Which is bad.
        return '', 404

    # Otherwise, it's a route in the web app.
    return render_template('index.html', **context)
项目:partycrasher    作者:naturalness    | 项目源码 | 文件源码
def jsonify_resource(resource):
    indent = None
    if current_app.config['JSONIFY_PRETTYPRINT_REGULAR'] \
            and not request.is_xhr:
        indent = 2
    return current_app.response_class(json.dumps(resource, indent=indent),
                                      mimetype='application/json')
项目:ooni-measurements    作者:TheTorProject    | 项目源码 | 文件源码
def api_private_blockpage_count():
    probe_cc = request.args.get('probe_cc')
    if probe_cc is None:
        raise Exception('err')
    url = urljoin(current_app.config['CENTRIFUGATION_BASE_URL'],
                  'blockpage-count-%s.json' % probe_cc)
    resp_text = json.dumps({'results': []})
    resp = requests.get(url)
    if resp.status_code != 404:
        resp_text = resp.text
    return current_app.response_class(
        resp_text,
        mimetype=current_app.config['JSONIFY_MIMETYPE']
    )
项目:ooni-measurements    作者:TheTorProject    | 项目源码 | 文件源码
def api_private_measurement_count_by_country():
    url = urljoin(current_app.config['CENTRIFUGATION_BASE_URL'], 'count-by-country.json')
    resp = requests.get(url)
    return current_app.response_class(
        resp.text,
        mimetype=current_app.config['JSONIFY_MIMETYPE']
    )
项目:ooni-measurements    作者:TheTorProject    | 项目源码 | 文件源码
def api_private_measurement_count_total():
    url = urljoin(current_app.config['CENTRIFUGATION_BASE_URL'], 'count-total.json')
    resp = requests.get(url)
    return current_app.response_class(
        resp.text,
        mimetype=current_app.config['JSONIFY_MIMETYPE']
    )
项目:NZ-ORCID-Hub    作者:Royal-Society-of-New-Zealand    | 项目源码 | 文件源码
def yamlfy(*args, **kwargs):
    """Create respose in YAML just like jsonify does it for JSON."""
    if args and kwargs:
        raise TypeError('yamlfy() behavior undefined when passed both args and kwargs')
    elif len(args) == 1:  # single args are passed directly to dumps()
        data = args[0]
    else:
        data = args or kwargs

    return current_app.response_class((yaml.dump(data), '\n'), mimetype="text/yaml")
项目:pillar    作者:armadillica    | 项目源码 | 文件源码
def jsonify(mongo_doc, status=200, headers=None):
    """JSonifies a Mongo document into a Flask response object."""

    return current_app.response_class(dumps(mongo_doc),
                                      mimetype='application/json',
                                      status=status,
                                      headers=headers)
项目:pillar    作者:armadillica    | 项目源码 | 文件源码
def bsonify(mongo_doc, status=200, headers=None):
    """BSonifies a Mongo document into a Flask response object."""

    import bson

    data = bson.BSON.encode(mongo_doc)
    return current_app.response_class(data,
                                      mimetype='application/bson',
                                      status=status,
                                      headers=headers)
项目:ztool-backhend    作者:Z-Tool    | 项目源码 | 文件源码
def jsonp(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        callback = request.args.get('callback', None)
        rtn = f(*args, **kwargs)
        if isinstance(rtn, tuple):
            content = '{0}({1})'.format(str(callback), rtn[0].data) if callback else rtn[0].data
            status = rtn[1]
        else:
            content = '{0}({1})'.format(str(callback), rtn.data) if callback else rtn.data
            status = 200
        return current_app.response_class(content, mimetype='application/json', status=status)

    return decorated_function
项目:ztool-backhend-mongo    作者:Z-Tool    | 项目源码 | 文件源码
def jsonp(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        callback = request.args.get('callback', None)
        rtn = f(*args, **kwargs)
        if isinstance(rtn, tuple):
            content = '{0}({1})'.format(str(callback), rtn[0].data) if callback else rtn[0].data
            status = rtn[1]
        else:
            content = '{0}({1})'.format(str(callback), eval(rtn.data)) if callback else rtn.data
            status = 200
        return current_app.response_class(content, mimetype='application/json', status=status)

    return decorated_function
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def jsonpify(*args, **kw):
    """Passes the specified arguments directly to :func:`jsonify` with a status
    code of 200, then wraps the response with the name of a JSON-P callback
    function specified as a query parameter called ``'callback'`` (or does
    nothing if no such callback function is specified in the request).

    If the keyword arguments include the string specified by :data:`_HEADERS`,
    its value must be a dictionary specifying headers to set before sending the
    JSONified response to the client. Headers on the response will be
    overwritten by headers specified in this dictionary.

    If the keyword arguments include the string specified by :data:`_STATUS`,
    its value must be an integer representing the status code of the response.
    Otherwise, the status code of the response will be :http:status:`200`.

    """
    # HACK In order to make the headers and status code available in the
    # content of the response, we need to send it from the view function to
    # this jsonpify function via its keyword arguments. This is a limitation of
    # the mimerender library: it has no way of making the headers and status
    # code known to the rendering functions.
    headers = kw.pop(_HEADERS, {})
    status_code = kw.pop(_STATUS, 200)
    response = jsonify(*args, **kw)
    callback = request.args.get('callback', False)
    if callback:
        # Reload the data from the constructed JSON string so we can wrap it in
        # a JSONP function.
        data = json.loads(response.data)
        # Force the 'Content-Type' header to be 'application/javascript'.
        #
        # Note that this is different from the mimetype used in Flask for JSON
        # responses; Flask uses 'application/json'. We use
        # 'application/javascript' because a JSONP response is valid
        # Javascript, but not valid JSON.
        headers['Content-Type'] = 'application/javascript'
        # Add the headers and status code as metadata to the JSONP response.
        meta = _headers_to_json(headers) if headers is not None else {}
        meta['status'] = status_code
        inner = json.dumps(dict(meta=meta, data=data))
        content = '{0}({1})'.format(callback, inner)
        # Note that this is different from the mimetype used in Flask for JSON
        # responses; Flask uses 'application/json'. We use
        # 'application/javascript' because a JSONP response is not valid JSON.
        mimetype = 'application/javascript'
        response = current_app.response_class(content, mimetype=mimetype)
    # Set the headers on the HTTP response as well.
    if headers:
        set_headers(response, headers)
    response.status_code = status_code
    return response
项目:ooni-measurements    作者:TheTorProject    | 项目源码 | 文件源码
def get_measurement(measurement_id):
    # XXX this query is SUPER slow
    m = RE_MSM_ID.match(measurement_id)
    if not m:
        raise BadRequest("Invalid measurement_id")
    msm_no = int(m.group(1))

    q = current_app.db_session.query(
            Measurement.report_no.label('report_no'),
            Measurement.frame_off.label('frame_off'),
            Measurement.frame_size.label('frame_size'),
            Measurement.intra_off.label('intra_off'),
            Measurement.intra_size.label('intra_size'),
            Report.report_no.label('r_report_no'),
            Report.autoclaved_no.label('r_autoclaved_no'),
            Autoclaved.filename.label('a_filename'),
            Autoclaved.autoclaved_no.label('a_autoclaved_no'),

    ).filter(Measurement.msm_no == msm_no)\
        .join(Report, Report.report_no == Measurement.report_no)\
        .join(Autoclaved, Autoclaved.autoclaved_no == Report.autoclaved_no)
    try:
        msmt = q.one()
    except exc.MultipleResultsFound:
        current_app.logger.warning("Duplicate rows for measurement_id: %s" % measurement_id)
        msmt = q.first()
    except exc.NoResultFound:
        # XXX we should actually return a 404 here
        raise BadRequest("No measurement found")

    # Usual size of LZ4 frames is 256kb of decompressed text.
    # Largest size of LZ4 frame was ~55Mb compressed and ~56Mb decompressed. :-/
    range_header = "bytes={}-{}".format(msmt.frame_off, msmt.frame_off + msmt.frame_size - 1)
    r = requests.get(urljoin(current_app.config['AUTOCLAVED_BASE_URL'], msmt.a_filename),
            headers={"Range": range_header})
    r.raise_for_status()
    blob = r.content
    if len(blob) != msmt.frame_size:
        raise RuntimeError('Failed to fetch LZ4 frame', len(blob), msmt.frame_size)
    blob = lz4framed.decompress(blob)[msmt.intra_off:msmt.intra_off+msmt.intra_size]
    if len(blob) != msmt.intra_size or blob[:1] != b'{' or blob[-1:] != b'}':
        raise RuntimeError('Failed to decompress LZ4 frame to measurement.json', len(blob), msmt.intra_size, blob[:1], blob[-1:])
    # There is no replacement of `measurement_id` with `msm_no` or anything
    # else to keep sanity. Maybe it'll happen as part of orchestration update.
    # Also, blob is not decoded intentionally to save CPU
    return current_app.response_class(
        blob,
        mimetype=current_app.config['JSONIFY_MIMETYPE']
    )
项目:covar_me_app    作者:CovarMe    | 项目源码 | 文件源码
def send_file(self, filename, base='fs', version=-1, cache_for=31536000):
        """Return an instance of the :attr:`~flask.Flask.response_class`
        containing the named file, and implement conditional GET semantics
        (using :meth:`~werkzeug.wrappers.ETagResponseMixin.make_conditional`).

        .. code-block:: python

            @app.route('/uploads/<path:filename>')
            def get_upload(filename):
                return mongo.send_file(filename)

        :param str filename: the filename of the file to return
        :param str base: the base name of the GridFS collections to use
        :param bool version: if positive, return the Nth revision of the file
           identified by filename; if negative, return the Nth most recent
           revision. If no such version exists, return with HTTP status 404.
        :param int cache_for: number of seconds that browsers should be
           instructed to cache responses
        """
        if not isinstance(base, text_type):
            raise TypeError('"base" must be string or unicode')
        if not isinstance(version, num_type):
            raise TypeError('"version" must be an integer')
        if not isinstance(cache_for, num_type):
            raise TypeError('"cache_for" must be an integer')

        storage = GridFS(self.db, base)

        try:
            fileobj = storage.get_version(filename=filename, version=version)
        except NoFile:
            abort(404)

        # mostly copied from flask/helpers.py, with
        # modifications for GridFS
        data = wrap_file(request.environ, fileobj, buffer_size=1024 * 256)
        response = current_app.response_class(
            data,
            mimetype=fileobj.content_type,
            direct_passthrough=True)
        response.content_length = fileobj.length
        response.last_modified = fileobj.upload_date
        response.set_etag(fileobj.md5)
        response.cache_control.max_age = cache_for
        response.cache_control.s_max_age = cache_for
        response.cache_control.public = True
        response.make_conditional(request)
        return response
项目:flask-zhenai-mongo-echarts    作者:Fretice    | 项目源码 | 文件源码
def send_file(self, filename, base='fs', version=-1, cache_for=31536000):
        """Return an instance of the :attr:`~flask.Flask.response_class`
        containing the named file, and implement conditional GET semantics
        (using :meth:`~werkzeug.wrappers.ETagResponseMixin.make_conditional`).

        .. code-block:: python

            @app.route('/uploads/<path:filename>')
            def get_upload(filename):
                return mongo.send_file(filename)

        :param str filename: the filename of the file to return
        :param str base: the base name of the GridFS collections to use
        :param bool version: if positive, return the Nth revision of the file
           identified by filename; if negative, return the Nth most recent
           revision. If no such version exists, return with HTTP status 404.
        :param int cache_for: number of seconds that browsers should be
           instructed to cache responses
        """
        if not isinstance(base, text_type):
            raise TypeError('"base" must be string or unicode')
        if not isinstance(version, num_type):
            raise TypeError('"version" must be an integer')
        if not isinstance(cache_for, num_type):
            raise TypeError('"cache_for" must be an integer')

        storage = GridFS(self.db, base)

        try:
            fileobj = storage.get_version(filename=filename, version=version)
        except NoFile:
            abort(404)

        # mostly copied from flask/helpers.py, with
        # modifications for GridFS
        data = wrap_file(request.environ, fileobj, buffer_size=1024 * 256)
        response = current_app.response_class(
            data,
            mimetype=fileobj.content_type,
            direct_passthrough=True)
        response.content_length = fileobj.length
        response.last_modified = fileobj.upload_date
        response.set_etag(fileobj.md5)
        response.cache_control.max_age = cache_for
        response.cache_control.s_max_age = cache_for
        response.cache_control.public = True
        response.make_conditional(request)
        return response