Python flask.request 模块,content_type() 实例源码

我们从Python开源项目中,提取了以下11个代码示例,用于说明如何使用flask.request.content_type()

项目:bucket_api    作者:jokamjohn    | 项目源码 | 文件源码
def post(current_user, bucket_id):
    """
    Storing an item into a Bucket
    :param current_user: User
    :param bucket_id: Bucket Id
    :return: Http Response
    """
    if not request.content_type == 'application/json':
        return response('failed', 'Content-type must be application/json', 401)

    data = request.get_json()
    item_name = data.get('name')
    if not item_name:
        return response('failed', 'No name or value attribute found', 401)

    # Get the user Bucket
    bucket = get_user_bucket(current_user, bucket_id)
    if bucket is None:
        return response('failed', 'User has no Bucket with Id ' + bucket_id, 202)

    # Save the Bucket Item into the Database
    item = BucketItem(item_name.lower(), data.get('description', None), bucket.id)
    item.save()
    return response_with_bucket_item('success', item, 200)
项目:heroku-python-boilerplate    作者:chavli    | 项目源码 | 文件源码
def validate_jpeg_binary(func):
    """ checks the mimetype and the binary data to ensure it's a JPEG """
    @wraps(func)
    def wrapper(*args, **kwargs):
        if request.content_type != "image/jpeg":
            return ErrorResponseJson("invalid content type: {}".format(request.content_type)).make_response()
        if imghdr.test_jpeg(request.data, None) != "jpeg":
            return ErrorResponseJson("invalid jpeg data").make_response()
        return func(*args, **kwargs)
    return wrapper
项目:pia    作者:soasme    | 项目源码 | 文件源码
def builtin_jq():
    """
    Builtin program: `jq`.
    It will run a `jq` progress and return a json object.
    """
    program = request.args.get('program', ".")
    command = request.data
    try:
        data = jq(program, command)
        resp = make_response(data)
        resp.content_type = 'application/json'
        return resp
    except InvalidJQFilter as exception:
        return jsonify(message=str(exception)), 400
项目:bucket_api    作者:jokamjohn    | 项目源码 | 文件源码
def reset_password(current_user):
    if request.content_type == "application/json":
        data = request.get_json()
        old_password = data.get('oldPassword')
        new_password = data.get('newPassword')
        password_confirmation = data.get('passwordConfirmation')
        if not old_password or not new_password or not password_confirmation:
            return response('failed', "Missing required attributes", 400)
        if bcrypt.check_password_hash(current_user.password, old_password.encode('utf-8')):
            if not new_password == password_confirmation:
                return response('failed', 'New Passwords do not match', 400)
            if not len(new_password) > 4:
                return response('failed', 'New password should be greater than four characters long', 400)
            current_user.reset_password(new_password)
            return response('success', 'Password reset successfully', 200)
        return response('failed', "Incorrect password", 401)
    return response('failed', 'Content type must be json', 400)


# Register classes as views
项目:bucket_api    作者:jokamjohn    | 项目源码 | 文件源码
def edit_bucket(current_user, bucket_id):
    """
    Validate the bucket Id. Also check for the name attribute in the json payload.
    If the name exists update the bucket with the new name.
    :param current_user: Current User
    :param bucket_id: Bucket Id
    :return: Http Json response
    """
    if request.content_type == 'application/json':
        data = request.get_json()
        name = data.get('name')
        if name:
            try:
                int(bucket_id)
            except ValueError:
                return response('failed', 'Please provide a valid Bucket Id', 400)
            user_bucket = User.get_by_id(current_user.id).buckets.filter_by(id=bucket_id).first()
            if user_bucket:
                user_bucket.update(name)
                return response_for_created_bucket(user_bucket, 201)
            return response('failed', 'The Bucket with Id ' + bucket_id + ' does not exist', 404)
        return response('failed', 'No attribute or value was specified, nothing was changed', 400)
    return response('failed', 'Content-type must be json', 202)
项目:pia    作者:soasme    | 项目源码 | 文件源码
def builtin_echo():
    """
    Builtin program: `echo`.
    It will response form data.
    """
    resp = make_response(request.data)
    resp.content_type = request.content_type
    return resp
项目:seedbox    作者:nailgun    | 项目源码 | 文件源码
def report(node):
    if not node.maintenance_mode:
        try:
            node.active_config_version = int(request.args.get('version'))
        except (ValueError, TypeError):
            return abort(400)

        if request.content_type != 'application/json':
            return abort(400)

        provision = models.Provision()
        provision.node = node
        provision.config_version = node.active_config_version
        provision.ignition_config = request.data
        if node.target_config_version == node.active_config_version:
            provision.ipxe_config = config_renderer.ipxe.render(node, request.url_root)
        models.db.session.add(provision)

    models.db.session.add(node)

    node.disks.update({
        models.Disk.wipe_next_boot: False
    })

    if node.cluster.are_etcd_nodes_configured:
        node.cluster.assert_etcd_cluster_exists = True
        models.db.session.add(node.cluster)

    models.db.session.commit()
    return Response('ok', mimetype='application/json')
项目:WebAct    作者:CreatCodeBuild    | 项目源码 | 文件源码
def image():
    # json = request.get_json()
    print('contentType', request.content_type)
    # length = len(json['binaryData'])
    print('data length', len(request.data))
    resultImageBuffer = process_image(request.data)
    # resultJsonString = JSONEncoder().encode({'resultImageBuffer': resultImageBuffer})
    return resultImageBuffer
项目:apizen    作者:blackmatrix7    | 项目源码 | 文件源码
def default_after_request(param):
    response_param = {'charset': param.charset,
                      'content_length': param.content_length,
                      'content_type': param.content_type,
                      'content_encoding': param.content_encoding,
                      'mimetype': param.mimetype,
                      'response': g.result if hasattr(g, 'result') else None,
                      'status': param.status,
                      'status_code': param.status_code}
    g.response_time = datetime.now()
    time_consuming = str(g.response_time - g.request_time)
    log_info = {'api_method': g.get('api_method'), 'api_version': g.get('api_version'),
                'request_param': g.get('request_param'), 'request_form': g.get('request_form'),
                'querystring': g.get('request_param')['query_string'], 'request_json': g.get('request_json'),
                'response_param': response_param, 'request_raw_data': g.request_raw_data,
                'request_time': g.get('request_time').strftime(current_app.config['APIZEN_DATETIME_FMT']),
                'response_time': g.get('response_time').strftime(current_app.config['APIZEN_DATETIME_FMT']),
                'time_consuming': time_consuming}
    if param.status_code >= 400:
        current_app.logger.error(log_info)
    else:
        current_app.logger.debug(log_info)
    return param


# ??????
项目:bucket_api    作者:jokamjohn    | 项目源码 | 文件源码
def edit_item(current_user, bucket_id, item_id):
    """
    Edit an item with a valid Id. The request content-type must be json and also the Bucket
    in which the item belongs must be among the user`s Buckets.
    The name of the item must be present in the payload but the description is optional.
    :param current_user: User
    :param bucket_id: Bucket Id
    :param item_id: Item Id
    :return: Response of Edit Item
    """
    if not request.content_type == 'application/json':
        return response('failed', 'Content-type must be application/json', 401)

    try:
        int(item_id)
    except ValueError:
        return response('failed', 'Provide a valid item Id', 202)

    # Get the user Bucket
    bucket = get_user_bucket(current_user, bucket_id)
    if bucket is None:
        return response('failed', 'User has no Bucket with Id ' + bucket_id, 202)

    # Get the item
    item = bucket.items.filter_by(id=item_id).first()
    if not item:
        abort(404)

    # Check for Json data
    request_json_data = request.get_json()
    item_new_name = request_json_data.get('name')
    item_new_description = request_json_data.get('description', None)

    if not request_json_data:
        return response('failed', 'No attributes specified in the request', 401)

    if not item_new_name:
        return response('failed', 'No name or value attribute found', 401)

    # Update the item record
    item.update(item_new_name, item_new_description)
    return response_with_bucket_item('success', item, 200)
项目:plex_autoscan    作者:l3uddz    | 项目源码 | 文件源码
def client_pushed():
    if request.content_type == 'application/json':
        data = request.get_json(silent=True)
    else:
        data = request.form.to_dict()

    if not data:
        logger.error("Invalid scan request from: %r", request.remote_addr)
        abort(400)
    logger.debug("Client %r request dump:\n%s", request.remote_addr, json.dumps(data, indent=4, sort_keys=True))

    if ('eventType' in data and data['eventType'] == 'Test') or ('EventType' in data and data['EventType'] == 'Test'):
        logger.info("Client %r made a test request, event: '%s'", request.remote_addr, 'Test')
    elif 'eventType' in data and data['eventType'] == 'Manual':
        logger.info("Client %r made a manual scan request for: '%s'", request.remote_addr, data['filepath'])
        final_path = utils.map_pushed_path(conf.configs, data['filepath'])
        # ignore this request?
        ignore, ignore_match = utils.should_ignore(final_path, conf.configs)
        if ignore:
            logger.info("Ignored scan request for '%s' because '%s' was matched from SERVER_IGNORE_LIST", final_path,
                        ignore_match)
            return "Ignoring scan request because %s was matched from your SERVER_IGNORE_LIST" % ignore_match
        if start_scan(final_path, 'manual', 'Manual'):
            return "'%s' was added to scan backlog." % final_path
        else:
            return "Error adding '%s' to scan backlog." % data['filepath']

    elif 'Movie' in data:
        logger.info("Client %r scan request for movie: '%s', event: '%s'", request.remote_addr,
                    data['Movie']['FilePath'], data['EventType'])
        final_path = utils.map_pushed_path(conf.configs, data['Movie']['FilePath'])
        start_scan(final_path, 'radarr', data['EventType'])
    elif 'movie' in data:
        # new radarr webhook
        path = os.path.join(data['movie']['folderPath'], data['movieFile']['relativePath'])
        logger.info("Client %r scan request for movie: '%s', event: '%s'", request.remote_addr, path,
                    "Upgrade" if data['isUpgrade'] else data['eventType'])
        final_path = utils.map_pushed_path(conf.configs, path)
        start_scan(final_path, 'radarr', "Upgrade" if data['isUpgrade'] else data['eventType'])
    elif 'Series' in data:
        logger.info("Client %r scan request for series: '%s', event: '%s'", request.remote_addr, data['Series']['Path'],
                    data['EventType'])
        final_path = utils.map_pushed_path(conf.configs, data['Series']['Path'])
        start_scan(final_path, 'sonarr', data['EventType'])
    elif 'series' and 'episodeFile' in data:
        # new sonarr webhook
        path = os.path.join(data['series']['path'], data['episodeFile']['relativePath'])
        logger.info("Client %r scan request for series: '%s', event: '%s'", request.remote_addr, path,
                    "Upgrade" if data['isUpgrade'] else data['eventType'])
        final_path = utils.map_pushed_path(conf.configs, path)
        start_scan(final_path, 'sonarr_dev', "Upgrade" if data['isUpgrade'] else data['eventType'])
    else:
        logger.error("Unknown scan request from: %r", request.remote_addr)
        abort(400)

    return "OK"


############################################################
# MAIN
############################################################